[jira] [Resolved] (SPARK-23778) SparkContext.emptyRDD confuses SparkContext.union

2018-06-19 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-23778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-23778.
-
   Resolution: Fixed
Fix Version/s: 2.4.0

Issue resolved by pull request 21333
[https://github.com/apache/spark/pull/21333]

> SparkContext.emptyRDD confuses SparkContext.union
> -
>
> Key: SPARK-23778
> URL: https://issues.apache.org/jira/browse/SPARK-23778
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.3.0, 2.3.0
>Reporter: Stefano Pettini
>Priority: Trivial
> Fix For: 2.4.0
>
> Attachments: as_it_should_be.png, 
> partitioner_lost_and_unneeded_extra_stage.png
>
>
> SparkContext.emptyRDD is an unpartitioned RDD. Clearly it's empty so whether 
> it's partitioned or not should be just a academic debate. Unfortunately it 
> doesn't seem to be like this and the issue has side effects.
> Namely, it confuses the RDD union.
> When there are N classic RDDs partitioned the same way, the union is 
> implemented with the optimized PartitionerAwareUnionRDD, that retains the 
> common partitioner in the result. If one of the N RDDs happens to be an 
> emptyRDD, as it doesn't have a partitioner, the union is implemented by just 
> appending all the partitions of the N RDDs, dropping the partitioner. But 
> there's no need for this, as the emptyRDD contains no elements. This results 
> in further unneeded shuffles once the result of the union is used.
> See for example:
> {{val p = new HashPartitioner(3)}}
> {{val a = context.parallelize(List(10, 20, 30, 40, 50)).keyBy(_ / 
> 10).partitionBy(p)}}
> {{val b1 = a.mapValues(_ + 1)}}
> {{val b2 = a.mapValues(_ - 1)}}
> {{val e = context.emptyRDD[(Int, Int)]}}
> {{val x = context.union(a, b1, b2, e)}}
> {{val y = x.reduceByKey(_ + _)}}
> {{assert(x.partitioner.contains(p))}}
> {{y.collect()}}
> The assert fails. Disabling it, it's possible to see that reduceByKey 
> introduced a shuffles, although all the input RDDs are already partitioned 
> the same way, but the emptyRDD.
> Forcing a partitioner on the emptyRDD:
> {{val e = context.emptyRDD[(Int, Int)].partitionBy(p)}}
> solves the problem with the assert and doesn't introduce the unneeded extra 
> stage and shuffle.
> Union implementation should be changed to ignore the partitioner of emptyRDDs 
> and consider those as _partitioned in a way compatible with any partitioner_, 
> basically ignoring them.
> Present since 1.3 at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23778) SparkContext.emptyRDD confuses SparkContext.union

2018-06-19 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-23778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-23778:
---

Assignee: Marco Gaido

> SparkContext.emptyRDD confuses SparkContext.union
> -
>
> Key: SPARK-23778
> URL: https://issues.apache.org/jira/browse/SPARK-23778
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.3.0, 2.3.0
>Reporter: Stefano Pettini
>Assignee: Marco Gaido
>Priority: Trivial
> Fix For: 2.4.0
>
> Attachments: as_it_should_be.png, 
> partitioner_lost_and_unneeded_extra_stage.png
>
>
> SparkContext.emptyRDD is an unpartitioned RDD. Clearly it's empty so whether 
> it's partitioned or not should be just a academic debate. Unfortunately it 
> doesn't seem to be like this and the issue has side effects.
> Namely, it confuses the RDD union.
> When there are N classic RDDs partitioned the same way, the union is 
> implemented with the optimized PartitionerAwareUnionRDD, that retains the 
> common partitioner in the result. If one of the N RDDs happens to be an 
> emptyRDD, as it doesn't have a partitioner, the union is implemented by just 
> appending all the partitions of the N RDDs, dropping the partitioner. But 
> there's no need for this, as the emptyRDD contains no elements. This results 
> in further unneeded shuffles once the result of the union is used.
> See for example:
> {{val p = new HashPartitioner(3)}}
> {{val a = context.parallelize(List(10, 20, 30, 40, 50)).keyBy(_ / 
> 10).partitionBy(p)}}
> {{val b1 = a.mapValues(_ + 1)}}
> {{val b2 = a.mapValues(_ - 1)}}
> {{val e = context.emptyRDD[(Int, Int)]}}
> {{val x = context.union(a, b1, b2, e)}}
> {{val y = x.reduceByKey(_ + _)}}
> {{assert(x.partitioner.contains(p))}}
> {{y.collect()}}
> The assert fails. Disabling it, it's possible to see that reduceByKey 
> introduced a shuffles, although all the input RDDs are already partitioned 
> the same way, but the emptyRDD.
> Forcing a partitioner on the emptyRDD:
> {{val e = context.emptyRDD[(Int, Int)].partitionBy(p)}}
> solves the problem with the assert and doesn't introduce the unneeded extra 
> stage and shuffle.
> Union implementation should be changed to ignore the partitioner of emptyRDDs 
> and consider those as _partitioned in a way compatible with any partitioner_, 
> basically ignoring them.
> Present since 1.3 at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24598) SPARK SQL:Datatype overflow conditions gives incorrect result

2018-06-19 Thread navya (JIRA)
navya created SPARK-24598:
-

 Summary: SPARK SQL:Datatype overflow conditions gives incorrect 
result
 Key: SPARK-24598
 URL: https://issues.apache.org/jira/browse/SPARK-24598
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
Reporter: navya


Execute an sql query, so that it results in overflow conditions. 

EX - SELECT 9223372036854775807 + 1 result = -9223372036854776000

 

Expected result - Error should be throw like mysql. 

mysql> SELECT 9223372036854775807 + 1;

ERROR 1690 (22003): BIGINT value is out of range in '(9223372036854775807 + 1)'



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24493) Kerberos Ticket Renewal is failing in Hadoop 2.8+ and Hadoop 3

2018-06-19 Thread Saisai Shao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517720#comment-16517720
 ] 

Saisai Shao commented on SPARK-24493:
-

This BUG is fixed in HDFS-12670, Spark should bump its supported Hadoop3 
version to 3.1.1 when released.

> Kerberos Ticket Renewal is failing in Hadoop 2.8+ and Hadoop 3
> --
>
> Key: SPARK-24493
> URL: https://issues.apache.org/jira/browse/SPARK-24493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, YARN
>Affects Versions: 2.3.0
>Reporter: Asif M
>Priority: Major
>
> Kerberos Ticket Renewal is failing on long running spark job. I have added 
> below 2 kerberos properties in the HDFS configuration and ran a spark 
> streaming job 
> ([hdfs_wordcount.py|https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py])
> {noformat}
> dfs.namenode.delegation.token.max-lifetime=180 (30min)
> dfs.namenode.delegation.token.renew-interval=90 (15min)
> {noformat}
>  
> Spark Job failed at 15min with below error:
> {noformat}
> 18/06/04 18:56:51 INFO DAGScheduler: ShuffleMapStage 10896 (call at 
> /usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py:2381)
>  failed in 0.218 s due to Job aborted due to stage failure: Task 0 in stage 
> 10896.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10896.0 
> (TID 7290, , executor 1): 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  token (token for abcd: HDFS_DELEGATION_TOKEN owner=a...@example.com, 
> renewer=yarn, realUser=, issueDate=1528136773875, maxDate=1528138573875, 
> sequenceNumber=38, masterKeyId=6) is expired, current time: 2018-06-04 
> 18:56:51,276+ expected renewal time: 2018-06-04 18:56:13,875+
> at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1499)
> at org.apache.hadoop.ipc.Client.call(Client.java:1445)
> at org.apache.hadoop.ipc.Client.call(Client.java:1355)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source)
> at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:317)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at com.sun.proxy.$Proxy19.getBlockLocations(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:856)
> at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:845)
> at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:834)
> at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:998)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:326)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:322)
> at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:334)
> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
> at 
> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:86)
> at 
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.liftedTree1$1(NewHadoopRDD.scala:189)
> at org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:186)
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:141)
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:70)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at 

[jira] [Commented] (SPARK-10781) Allow certain number of failed tasks and allow job to succeed

2018-06-19 Thread Hieu Tri Huynh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-10781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517716#comment-16517716
 ] 

Hieu Tri Huynh commented on SPARK-10781:


I attached a proposed solution for this Jira. Hope to receive opinions from all 
of you. Thank you.

[^SPARK_10781_Proposed_Solution.pdf]

> Allow certain number of failed tasks and allow job to succeed
> -
>
> Key: SPARK-10781
> URL: https://issues.apache.org/jira/browse/SPARK-10781
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.5.0
>Reporter: Thomas Graves
>Priority: Major
> Attachments: SPARK_10781_Proposed_Solution.pdf
>
>
> MapReduce has this config mapreduce.map.failures.maxpercent and 
> mapreduce.reduce.failures.maxpercent which allows for a certain percent of 
> tasks to fail but the job to still succeed.  
> This could be a useful feature in Spark also if a job doesn't need all the 
> tasks to be successful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-10781) Allow certain number of failed tasks and allow job to succeed

2018-06-19 Thread Hieu Tri Huynh (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-10781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hieu Tri Huynh updated SPARK-10781:
---
Attachment: SPARK_10781_Proposed_Solution.pdf

> Allow certain number of failed tasks and allow job to succeed
> -
>
> Key: SPARK-10781
> URL: https://issues.apache.org/jira/browse/SPARK-10781
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.5.0
>Reporter: Thomas Graves
>Priority: Major
> Attachments: SPARK_10781_Proposed_Solution.pdf
>
>
> MapReduce has this config mapreduce.map.failures.maxpercent and 
> mapreduce.reduce.failures.maxpercent which allows for a certain percent of 
> tasks to fail but the job to still succeed.  
> This could be a useful feature in Spark also if a job doesn't need all the 
> tasks to be successful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24593) can not find hive table after spark streaming started

2018-06-19 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-24593.
--
Resolution: Invalid

Sounds more like a question for now. Let's redirect questions to dev/user 
mailing list and see if it's really an issue before filing here as an issue.

> can not find hive table after spark streaming started
> -
>
> Key: SPARK-24593
> URL: https://issues.apache.org/jira/browse/SPARK-24593
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, SQL
>Affects Versions: 2.3.0, 2.3.1
> Environment: {code:java}
> // demo code
> /*
> * Licensed to the Apache Software Foundation (ASF) under one or more
> * contributor license agreements. See the NOTICE file distributed with
> * this work for additional information regarding copyright ownership.
> * The ASF licenses this file to You under the Apache License, Version 2.0
> * (the "License"); you may not use this file except in compliance with
> * the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package org.apache.spark.examples.streaming;
> import java.util.Arrays;
> import java.util.regex.Pattern;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.api.java.StorageLevels;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> /**
> * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text 
> received from the
> * network every second.
> *
> * Usage: JavaSqlNetworkWordCount  
> *  and  describe the TCP server that Spark Streaming would 
> connect to receive data.
> *
> * To run this on your local machine, you need to first run a Netcat server
> * `$ nc -lk `
> * and then run the example
> * `$ bin/run-example 
> org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost `
> */
> public final class JavaSqlNetworkWordCount {
> private static final Pattern SPACE = Pattern.compile(" ");
> public static void main(String[] args) throws Exception {
> if (args.length < 2) {
> System.err.println("Usage: JavaNetworkWordCount  ");
> System.exit(1);
> }
> StreamingExamples.setStreamingLogLevels();
> // Create the context with a 1 second batch size
> SparkConf sparkConf = new SparkConf().setAppName("JavaSqlNetworkWordCount");
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
> Durations.seconds(1));
> // Create a JavaReceiverInputDStream on target ip:port and count the
> // words in input stream of \n delimited text (eg. generated by 'nc')
> // Note that no duplication in storage level only for running locally.
> // Replication necessary in distributed scenario for fault tolerance.
> JavaReceiverInputDStream lines = ssc.socketTextStream(
> args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
> JavaDStream words = lines.flatMap(x -> 
> Arrays.asList(SPACE.split(x)).iterator());
> // Convert RDDs of the words DStream to DataFrame and run SQL query
> words.foreachRDD((rdd, time) -> {
> SparkSession spark = 
> JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
> // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
> JavaRDD rowRDD = rdd.map(word -> {
> JavaRecord record = new JavaRecord();
> record.setWord(word);
> return record;
> });
> Dataset wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class);
> // Creates a temporary view using the DataFrame
> wordsDataFrame.createOrReplaceTempView("words");
> // Do word count on table using SQL and print it
> Dataset wordCountsDataFrame =
> spark.sql("select word, count(*) as total from words group by word");
> System.out.println("= " + time + "=");
> wordCountsDataFrame.show();
> // access hive talbe failed
> spark.sql("select * from test_fch");
> });
> ssc.start();
> ssc.awaitTermination();
> }
> }
> /** Lazily instantiated singleton instance of SparkSession */
> class JavaSparkSessionSingleton {
> private static transient SparkSession instance = null;
> public static SparkSession getInstance(SparkConf sparkConf) {
> if (instance == null) {
> instance = SparkSession
> .builder()
> .config(sparkConf)
> .enableHiveSupport()
> 

[jira] [Resolved] (SPARK-24592) can not find hive table after spark streaming start

2018-06-19 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-24592.
--
Resolution: Duplicate

Looks a duplicate of SPARK-24593.

> can not find hive table after spark streaming start
> ---
>
> Key: SPARK-24592
> URL: https://issues.apache.org/jira/browse/SPARK-24592
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, SQL
>Affects Versions: 2.3.0, 2.3.1
> Environment:  
> {code:java}
> /*
> * Licensed to the Apache Software Foundation (ASF) under one or more
> * contributor license agreements. See the NOTICE file distributed with
> * this work for additional information regarding copyright ownership.
> * The ASF licenses this file to You under the Apache License, Version 2.0
> * (the "License"); you may not use this file except in compliance with
> * the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package org.apache.spark.examples.streaming;
> import java.util.Arrays;
> import java.util.regex.Pattern;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.api.java.StorageLevels;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> /**
> * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text 
> received from the
> * network every second.
> *
> * Usage: JavaSqlNetworkWordCount  
> *  and  describe the TCP server that Spark Streaming would 
> connect to receive data.
> *
> * To run this on your local machine, you need to first run a Netcat server
> * `$ nc -lk `
> * and then run the example
> * `$ bin/run-example 
> org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost `
> */
> public final class JavaSqlNetworkWordCount {
> private static final Pattern SPACE = Pattern.compile(" ");
> public static void main(String[] args) throws Exception {
> if (args.length < 2) {
> System.err.println("Usage: JavaNetworkWordCount  ");
> System.exit(1);
> }
> StreamingExamples.setStreamingLogLevels();
> // Create the context with a 1 second batch size
> SparkConf sparkConf = new SparkConf().setAppName("JavaSqlNetworkWordCount");
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
> Durations.seconds(1));
> // Create a JavaReceiverInputDStream on target ip:port and count the
> // words in input stream of \n delimited text (eg. generated by 'nc')
> // Note that no duplication in storage level only for running locally.
> // Replication necessary in distributed scenario for fault tolerance.
> JavaReceiverInputDStream lines = ssc.socketTextStream(
> args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
> JavaDStream words = lines.flatMap(x -> 
> Arrays.asList(SPACE.split(x)).iterator());
> // Convert RDDs of the words DStream to DataFrame and run SQL query
> words.foreachRDD((rdd, time) -> {
> SparkSession spark = 
> JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
> // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
> JavaRDD rowRDD = rdd.map(word -> {
> JavaRecord record = new JavaRecord();
> record.setWord(word);
> return record;
> });
> Dataset wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class);
> // Creates a temporary view using the DataFrame
> wordsDataFrame.createOrReplaceTempView("words");
> // Do word count on table using SQL and print it
> Dataset wordCountsDataFrame =
> spark.sql("select word, count(*) as total from words group by word");
> System.out.println("= " + time + "=");
> wordCountsDataFrame.show();
> spark.sql("select * from test_fch");
> });
> ssc.start();
> ssc.awaitTermination();
> }
> }
> /** Lazily instantiated singleton instance of SparkSession */
> class JavaSparkSessionSingleton {
> private static transient SparkSession instance = null;
> public static SparkSession getInstance(SparkConf sparkConf) {
> if (instance == null) {
> instance = SparkSession
> .builder()
> .config(sparkConf)
> .enableHiveSupport()
> .getOrCreate();
> }
> return instance;
> }
> }
> {code}
>  
>Reporter: lhq
>Priority: Major
>
> org.apache.spark.sql.AnalysisException: Table 

[jira] [Resolved] (SPARK-24595) What about additional support on deeply nested column?

2018-06-19 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-24595.
--
Resolution: Invalid

Let's redirect the question to dev/user mailing list before filing here as a 
JIRA.

> What about additional support on deeply nested column?
> --
>
> Key: SPARK-24595
> URL: https://issues.apache.org/jira/browse/SPARK-24595
> Project: Spark
>  Issue Type: Question
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Zejun Li
>Priority: Major
>
> I store some trajectories data in parquet with this schema:
> {code:java}
> create table traj(
>   id string,
>   points array lat:   double,
> lng:   double,
> time:  bigint,
> speed: double,
> ... lots attributes here
> candidate_road: array>
>>>
> ){code}
> It contains a lots of attribute comes from sensors. It also have a nested 
> array which contains information generated during map-matching algorithm.
>  
> All of my algorithm run on this dataset is trajectory-oriented, which means 
> they often do iteration on points, and use a subset of point's attributes to 
> do some computation. With this schema I can get points of trajectory without 
> doing `group by` and `collect_list`.
>  
> Because Parquet works very well on deeply nested data, so I directly store it 
> in parquet format with no flatten.
> It works very well with Impala, because Impala has some special support on 
> nested data:
>  
> {code:java}
> select
>   id,
>   avg_speed
> from 
>   traj t,
>   (select avg(speed) avg_speed from t.points where time < '2018-06-19'){code}
> As you can see, Impala treat array of structs as a nested table, and can do 
> some computation on array elements at pre-row level. And Impala will use 
> Parquet's features to prune unused attributes in point struct.
>  
> I use Spark for some complex algorithm which cannot written in pure SQL. But 
> I meet some trouble with Spark DataFrame API:
> Spark cannot do schema prune and filter push-down on nested column. And it 
> seems like there is no handy syntax to play with deeply nested data.
>  * `explode` not help in my scenario, because I need to preserve the 
> trajectory-points hierarchy. If I use `explode` here, I need do a extra 
> `group by` on `id`.
>  * Although, I can directly select `points.lat`, but it lost it structure. If 
> I need array of (lat, lng) pair, I need to zip two array. And it cannot work 
> at deeper nested level, such as select `points.candidate_road.score`. 
>  * Maybe I can use parquet-mr package to read file as RDD, and pass read 
> schema directly to it. But this manner lost Hive integration and vectorized 
> reader in Spark.
>  
> So, I think it is nice to have a Impala style subquery syntax on complex 
> data, or can we add some support to do schema projection on nested data like:
>  
> {code:java}
> select id, extract(points, lat, lng, extract(candidate_road, score)) from 
> traj{code}
> which produce schema as:
>  
>  
> {code:java}
> |- id string
> |- points array of struct
> |- lat double
> |- lng double
> |- candidate_road array of struct
> |- score double{code}
> And user can play with points with desired schema and data prune in Parquet.
>  
>  
> Or if there are some existing syntax to done my work?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24336) Support 'pass through' transformation in BasicOperators

2018-06-19 Thread Jungtaek Lim (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-24336.
--
Resolution: Invalid

> Support 'pass through' transformation in BasicOperators
> ---
>
> Key: SPARK-24336
> URL: https://issues.apache.org/jira/browse/SPARK-24336
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Jungtaek Lim
>Priority: Major
>
> As of now BasicOperators enumerates all the cases to match LogicalPlan to 
> SparkPlan, but some of cases are just a kind of simple transformation as the 
> code stated as 'pass through', which just need to provide same parameters 
> except converting LogicalPlan to SparkPlan via calling `planLater`.
> We can automate this via leveraging reflection. ScalaReflection already has 
> many methods to leverage, so we would be good to go.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24597) Spark ML Pipeline Should support non-linear models => DAGPipeline

2018-06-19 Thread Michael Dreibelbis (JIRA)
Michael Dreibelbis created SPARK-24597:
--

 Summary: Spark ML Pipeline Should support non-linear models => 
DAGPipeline
 Key: SPARK-24597
 URL: https://issues.apache.org/jira/browse/SPARK-24597
 Project: Spark
  Issue Type: New Feature
  Components: ML
Affects Versions: 2.3.1
Reporter: Michael Dreibelbis


Currently SparkML Pipeline/PipelineModel supports single linear dataset 
transformation

despite the documentation stating otherwise:

[reference 
documentation|https://spark.apache.org/docs/2.3.0/ml-pipeline.html#details] 

 I'm proposing implementing a DAGPipeline and supporting multiple datasets as 
input

The code could look something like this:

 
{code:java}
val ds1 = /*dataset 1 creation*/
val ds2 = /*dataset 2 creation*/

// nodes take on uid from estimator/transformer
val i1 = IdentityNode(new IdentityTransformer("i1"))
val i2 = IdentityNode(new IdentityTransformer("i2"))
val bi = TransformerNode(new Binarizer("bi"))
val cv = EstimatorNode(new CountVectorizer("cv"))
val idf = EstimatorNode(new IDF("idf"))
val j1 = JoinerNode(new Joiner("j1"))
val nodes = Array(i1, i2, bi, cv, idf)
val edges = Array(
("i1", "cv"), ("cv", "idf"), ("idf", "j1"), 
("i2", "bi"), ("bi", "j1"))
val p = new DAGPipeline(nodes, edges)
.setIdentity("i1", ds1)
.setIdentity("i2", ds2)
val m = p.fit(spark.emptyDataFrame)
m.setIdentity("i1", ds1).setIdentity("i2", ds2)
m.transform(spark.emptyDataFrame)
{code}
 

 

         

          



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24596) Non-cascading Cache Invalidation

2018-06-19 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24596:


Assignee: (was: Apache Spark)

> Non-cascading Cache Invalidation
> 
>
> Key: SPARK-24596
> URL: https://issues.apache.org/jira/browse/SPARK-24596
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Maryann Xue
>Priority: Major
> Fix For: 2.4.0
>
>
> When invalidating a cache, we invalid other caches dependent on this cache to 
> ensure cached data is up to date. For example, when the underlying table has 
> been modified or the table has been dropped itself, all caches that use this 
> table should be invalidated or refreshed.
> However, in other cases, like when user simply want to drop a cache to free 
> up memory, we do not need to invalidate dependent caches since no underlying 
> data has been changed. For this reason, we would like to introduce a new 
> cache invalidation mode: the non-cascading cache invalidation. And we choose 
> between the existing mode and the new mode for different cache invalidation 
> scenarios:
>  # Drop tables and regular (persistent) views: regular mode
>  # Drop temporary views: non-cascading mode
>  # Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode
>  # Call DataSet.unpersist(): non-cascading mode
> Note that a regular (persistent) view is a database object just like a table, 
> so after dropping a regular view (whether cached or not cached), any query 
> referring to that view should no long be valid. Hence if a cached persistent 
> view is dropped, we need to invalidate the all dependent caches so that 
> exceptions will be thrown for any later reference. On the other hand, a 
> temporary view is in fact equivalent to an unnamed DataSet, and dropping a 
> temporary view should have no impact on queries referencing that view. Thus 
> we should do non-cascading uncaching for temporary views, which also 
> guarantees a consistent uncaching behavior between temporary views and 
> unnamed DataSets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24596) Non-cascading Cache Invalidation

2018-06-19 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24596:


Assignee: Apache Spark

> Non-cascading Cache Invalidation
> 
>
> Key: SPARK-24596
> URL: https://issues.apache.org/jira/browse/SPARK-24596
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Maryann Xue
>Assignee: Apache Spark
>Priority: Major
> Fix For: 2.4.0
>
>
> When invalidating a cache, we invalid other caches dependent on this cache to 
> ensure cached data is up to date. For example, when the underlying table has 
> been modified or the table has been dropped itself, all caches that use this 
> table should be invalidated or refreshed.
> However, in other cases, like when user simply want to drop a cache to free 
> up memory, we do not need to invalidate dependent caches since no underlying 
> data has been changed. For this reason, we would like to introduce a new 
> cache invalidation mode: the non-cascading cache invalidation. And we choose 
> between the existing mode and the new mode for different cache invalidation 
> scenarios:
>  # Drop tables and regular (persistent) views: regular mode
>  # Drop temporary views: non-cascading mode
>  # Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode
>  # Call DataSet.unpersist(): non-cascading mode
> Note that a regular (persistent) view is a database object just like a table, 
> so after dropping a regular view (whether cached or not cached), any query 
> referring to that view should no long be valid. Hence if a cached persistent 
> view is dropped, we need to invalidate the all dependent caches so that 
> exceptions will be thrown for any later reference. On the other hand, a 
> temporary view is in fact equivalent to an unnamed DataSet, and dropping a 
> temporary view should have no impact on queries referencing that view. Thus 
> we should do non-cascading uncaching for temporary views, which also 
> guarantees a consistent uncaching behavior between temporary views and 
> unnamed DataSets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24596) Non-cascading Cache Invalidation

2018-06-19 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517599#comment-16517599
 ] 

Apache Spark commented on SPARK-24596:
--

User 'maryannxue' has created a pull request for this issue:
https://github.com/apache/spark/pull/21594

> Non-cascading Cache Invalidation
> 
>
> Key: SPARK-24596
> URL: https://issues.apache.org/jira/browse/SPARK-24596
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Maryann Xue
>Priority: Major
> Fix For: 2.4.0
>
>
> When invalidating a cache, we invalid other caches dependent on this cache to 
> ensure cached data is up to date. For example, when the underlying table has 
> been modified or the table has been dropped itself, all caches that use this 
> table should be invalidated or refreshed.
> However, in other cases, like when user simply want to drop a cache to free 
> up memory, we do not need to invalidate dependent caches since no underlying 
> data has been changed. For this reason, we would like to introduce a new 
> cache invalidation mode: the non-cascading cache invalidation. And we choose 
> between the existing mode and the new mode for different cache invalidation 
> scenarios:
>  # Drop tables and regular (persistent) views: regular mode
>  # Drop temporary views: non-cascading mode
>  # Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode
>  # Call DataSet.unpersist(): non-cascading mode
> Note that a regular (persistent) view is a database object just like a table, 
> so after dropping a regular view (whether cached or not cached), any query 
> referring to that view should no long be valid. Hence if a cached persistent 
> view is dropped, we need to invalidate the all dependent caches so that 
> exceptions will be thrown for any later reference. On the other hand, a 
> temporary view is in fact equivalent to an unnamed DataSet, and dropping a 
> temporary view should have no impact on queries referencing that view. Thus 
> we should do non-cascading uncaching for temporary views, which also 
> guarantees a consistent uncaching behavior between temporary views and 
> unnamed DataSets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24583) Wrong schema type in InsertIntoDataSourceCommand

2018-06-19 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-24583.
-
   Resolution: Fixed
 Assignee: Maryann Xue
Fix Version/s: 2.3.2

> Wrong schema type in InsertIntoDataSourceCommand
> 
>
> Key: SPARK-24583
> URL: https://issues.apache.org/jira/browse/SPARK-24583
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Maryann Xue
>Assignee: Maryann Xue
>Priority: Major
> Fix For: 2.3.2, 2.4.0
>
>
> For a DataSource table, whose schema contains a field with "nullable=false", 
> while user tries to insert a NULL value into this field, the input dataFrame 
> will return an incorrect value or throw NullPointerException. And that's 
> because, the schema nullability of the input relation has been overridden 
> bluntly with the destination schema by the code below in 
> {{InsertIntoDataSourceCommand}}:
> {code:java}
>   override def run(sparkSession: SparkSession): Seq[Row] = {
> val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
> val data = Dataset.ofRows(sparkSession, query)
> // Apply the schema of the existing table to the new data.
> val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, 
> logicalRelation.schema)
> relation.insert(df, overwrite)
> // Re-cache all cached plans(including this relation itself, if it's 
> cached) that refer to this
> // data source relation.
> sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, 
> logicalRelation)
> Seq.empty[Row]
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24596) Non-cascading Cache Invalidation

2018-06-19 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-24596:
---

 Summary: Non-cascading Cache Invalidation
 Key: SPARK-24596
 URL: https://issues.apache.org/jira/browse/SPARK-24596
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Maryann Xue
 Fix For: 2.4.0


When invalidating a cache, we invalid other caches dependent on this cache to 
ensure cached data is up to date. For example, when the underlying table has 
been modified or the table has been dropped itself, all caches that use this 
table should be invalidated or refreshed.

However, in other cases, like when user simply want to drop a cache to free up 
memory, we do not need to invalidate dependent caches since no underlying data 
has been changed. For this reason, we would like to introduce a new cache 
invalidation mode: the non-cascading cache invalidation. And we choose between 
the existing mode and the new mode for different cache invalidation scenarios:
 # Drop tables and regular (persistent) views: regular mode
 # Drop temporary views: non-cascading mode
 # Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode
 # Call DataSet.unpersist(): non-cascading mode

Note that a regular (persistent) view is a database object just like a table, 
so after dropping a regular view (whether cached or not cached), any query 
referring to that view should no long be valid. Hence if a cached persistent 
view is dropped, we need to invalidate the all dependent caches so that 
exceptions will be thrown for any later reference. On the other hand, a 
temporary view is in fact equivalent to an unnamed DataSet, and dropping a 
temporary view should have no impact on queries referencing that view. Thus we 
should do non-cascading uncaching for temporary views, which also guarantees a 
consistent uncaching behavior between temporary views and unnamed DataSets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24565) Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame

2018-06-19 Thread Shixiong Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu resolved SPARK-24565.
--
   Resolution: Fixed
Fix Version/s: 2.4.0

Issue resolved by pull request 21571
[https://github.com/apache/spark/pull/21571]

> Add API for in Structured Streaming for exposing output rows of each 
> microbatch as a DataFrame
> --
>
> Key: SPARK-24565
> URL: https://issues.apache.org/jira/browse/SPARK-24565
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 2.4.0
>
>
> Currently, the micro-batches in the MicroBatchExecution is not exposed to the 
> user through any public API. This was because we did not want to expose the 
> micro-batches, so that all the APIs we expose, we can eventually support them 
> in the Continuous engine. But now that we have a better sense of building a 
> ContinuousExecution, I am considering adding APIs which will run only the 
> MicroBatchExecution. I have quite a few use cases where exposing the 
> micro-batch output as a dataframe is useful. 
> - Pass the output rows of each batch to a library that is designed only the 
> batch jobs (example, uses many ML libraries need to collect() while learning).
> - Reuse batch data sources for output whose streaming version does not exist 
> (e.g. redshift data source).
> - Writer the output rows to multiple places by writing twice for each batch. 
> This is not the most elegant thing to do for multiple-output streaming 
> queries but is likely to be better than running two streaming queries 
> processing the same data twice.
> The proposal is to add a method {{foreachBatch(f: Dataset[T] => Unit)}} to 
> Scala/Java/Python {{DataStreamWriter}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517532#comment-16517532
 ] 

Wenbo Zhao edited comment on SPARK-24578 at 6/19/18 8:55 PM:
-

woop, [~attilapiros], sorry, I didn't know you have created a PR. 


was (Author: wbzhao):
woop, [~attilapiros], sorry, I didn't you have created a PR. 

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517532#comment-16517532
 ] 

Wenbo Zhao commented on SPARK-24578:


woop, [~attilapiros], sorry, I didn't you have created a PR. 

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => println(i); 
> 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517529#comment-16517529
 ] 

Apache Spark commented on SPARK-24578:
--

User 'WenboZhao' has created a pull request for this issue:
https://github.com/apache/spark/pull/21593

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Attila Zsolt Piros (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517515#comment-16517515
 ] 

Attila Zsolt Piros commented on SPARK-24578:


[~wbzhao] oh sorry I read your comment late, definitely without your help 
(pointing to the right commit caused the problem) it would took much much 
longer. If you like please create another PR, it is fine for me.

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()

[jira] [Resolved] (SPARK-24534) Add a way to bypass entrypoint.sh script if no spark cmd is passed

2018-06-19 Thread Erik Erlandson (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Erlandson resolved SPARK-24534.

   Resolution: Fixed
Fix Version/s: 2.4.0

> Add a way to bypass entrypoint.sh script if no spark cmd is passed
> --
>
> Key: SPARK-24534
> URL: https://issues.apache.org/jira/browse/SPARK-24534
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Ricardo Martinelli de Oliveira
>Priority: Minor
> Fix For: 2.4.0
>
>
> As an improvement in the entrypoint.sh script, I'd like to propose spark 
> entrypoint do a passthrough if driver/executor/init is not the command 
> passed. Currently it raises an error.
> To me more specific, I'm talking about these lines:
> [https://github.com/apache/spark/blob/master/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L113-L114]
> This allows the openshift-spark image to continue to function as a Spark 
> Standalone component, with custom configuration support etc. without 
> compromising the previous method to configure the cluster inside a kubernetes 
> environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24578:


Assignee: Apache Spark

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Assignee: Apache Spark
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => println(i); 
> 

[jira] [Assigned] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24578:


Assignee: (was: Apache Spark)

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => println(i); 
> df.groupBy("x1").agg(count("value")).show() }
> {code}
>  
> In the 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517505#comment-16517505
 ] 

Apache Spark commented on SPARK-24578:
--

User 'attilapiros' has created a pull request for this issue:
https://github.com/apache/spark/pull/21592

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517465#comment-16517465
 ] 

Wenbo Zhao commented on SPARK-24578:


[~attilapiros] if don't mind, I could create a PR for it :)

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => println(i); 
> 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Marcelo Vanzin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517461#comment-16517461
 ] 

Marcelo Vanzin commented on SPARK-24578:


Ah, I see. That makes sense. (I actually took at look at netty changes and 
didn't find anything in this area, so was starting to wonder.)

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517458#comment-16517458
 ] 

Wenbo Zhao commented on SPARK-24578:


Hi [~vanzin].  the commit 
[https://github.com/apache/spark/commit/16612638f0539f197eb7deb1be2ec53fed60d707#diff-a1f697fbd4610dff4eb8ff848e8cea2a]
 is only included  since 2.3.0.  If I understand right, the root cause is that 
we don't do `consolidateIfNeeded` anymore for many small chunks which causes 
the buf.notBuffer() has bad performance in the case that we have to call 
`copyByteBuf()` many times. In my example, the many times = 500MB / 256 KB.

[~attilapiros] I applied 
{code:java}
ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), 
Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT));{code}
in our spark distribution and ran through the test code in this jira. This 
works fine. 

 

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Attila Zsolt Piros (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517452#comment-16517452
 ] 

Attila Zsolt Piros commented on SPARK-24578:


[~wbzhao] Yes, you are right, readerIndex is the first (I just checked 
skipBytes), thanks.

[~irashid] Yes, I can create a PR soon.

[~vanzin] Netty was upgraded between 2.2.1 and 2.3.0: from 4.0.43.Final to 
4.1.17.Final. Could it be?

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Marcelo Vanzin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517449#comment-16517449
 ] 

Marcelo Vanzin commented on SPARK-24578:


Attila's suggestion looks good, but I wonder what caused this to show up in 
2.3... the code he's changing has been there since 2.0, so my guess would be 
something changed in Netty (which was upgraded in 2.3). Anyway, just curious 
about the underlying cause of the change.

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> 

[jira] [Resolved] (SPARK-12436) If all values of a JSON field is null, JSON's inferSchema should return NullType instead of StringType

2018-06-19 Thread Xiangrui Meng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-12436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangrui Meng resolved SPARK-12436.
---
Resolution: Resolved

Resolved by SPARK-23772 (an alternative solution to this JIRA)

> If all values of a JSON field is null, JSON's inferSchema should return 
> NullType instead of StringType
> --
>
> Key: SPARK-12436
> URL: https://issues.apache.org/jira/browse/SPARK-12436
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Reynold Xin
>Priority: Major
>  Labels: starter
>
> Right now, JSON's inferSchema will return {{StringType}} for a field that 
> always has null values or an {{ArrayType(StringType)}}  for a field that 
> always has empty array values. Although this behavior makes writing JSON data 
> to other data sources easy (i.e. when writing data, we do not need to remove 
> those {{NullType}} or {{ArrayType(NullType)}} columns), it makes downstream 
> application hard to reason about the actual schema of the data and thus makes 
> schema merging hard. We should allow JSON's inferSchema returns {{NullType}} 
> and {{ArrayType(NullType)}}. Also, we need to make sure that when we write 
> data out, we should remove those {{NullType}} or {{ArrayType(NullType)}} 
> columns first. 
> Besides  {{NullType}} and {{ArrayType(NullType)}}, we may need to do the same 
> thing for empty {{StructType}}s (i.e. a {{StructType}} having 0 fields). 
> To finish this work, we need to finish the following sub-tasks:
> * Allow JSON's inferSchema returns {{NullType}} and {{ArrayType(NullType)}}.
> * Determine whether we need to add the operation of removing {{NullType}} and 
> {{ArrayType(NullType)}} columns from the data that will be write out for all 
> data sources (i.e. data sources based our data source API and Hive tables). 
> Or, we should just add this operation for certain data sources (e.g. 
> Parquet). For example, we may not need this operation for Hive because Hive 
> has VoidObjectInspector.
> * Implement the change and get it merged to Spark master.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24587) RDD.takeOrdered uses reduce, pulling all partition data to the driver

2018-06-19 Thread Ryan Deak (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Deak updated SPARK-24587:
--
Description: 
*NOTE*: _This is likely a *very* impactful change, and likely only matters when 
{{num}} is large, but without something like the proposed change, algorithms 
based on distributed {{top-K}} don't scale very well._

h2. Description

{{[RDD.takeOrdered|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1432-L1437]}}
 uses 
{{[reduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1011]}}
 to combine {{num}}\-sized {{BoundedPriorityQueue}} instances, where {{num}} is 
the size of the returned {{Array}}.  Consequently, even when the size of the 
return value is small, relative to the driver memory, errors can occur.

An example error is:

{code}
18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 28 
tasks (8.1 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 29 
tasks (8.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
...
18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 160 
tasks (46.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
{code}

It's clear from this message that although the resulting size of the result 
will be approximately *0.3 GB*  ({{46.4/160}}), the amount of driver memory 
required to combine the results is more than {{46 GB}}.

h2. Proposed Solution

This amount of memory required can be dramatically reduced by using 
{{[treeReduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1040]}}.
  For instance replacing the {{else}} clause with:

{code:language=scala}
else {
  import scala.math.{ceil, log, max}
  val depth = max(1, ceil(log(mapRDDs.partitions.length) / log(2)).toInt)

  mapRDDs.treeReduce(
(queue1, queue2) => queue1 ++= queue2,
depth
  ).toArray.sorted(ord)
}
{code}

This should require less than double the network communication but should scale 
to much larger values of the {{num}} parameter without configuration changes or 
beefier machines.

h2. Code Potentially Impacted

* ML Lib's 
{{[CountVectorizer|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala#L232]}}

  was:
*NOTE*: _This is likely a *very* impactful change, and likely only matters when 
{{num}} is large, but without something like the proposed change, algorithms 
based on distributed {{top-K}} don't scale very well._

h2. Description

{{[RDD.takeOrdered|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1432-L1437]}}
 uses 
{{[reduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1011]}}
 to combine {{num}}\-sized {{BoundedPriorityQueue}} instances, where {{num}} is 
the size of the returned {{Array}}.  Consequently, even when the size of the 
return value is small, relative to the driver memory, errors can occur.

An example error is:

{code}
18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 28 
tasks (8.1 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 29 
tasks (8.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
...
18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 160 
tasks (46.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
{code}

It's clear from this message that although the resulting size of the result 
will be approximately *0.3 GB*  ({{46.4/160}}), the amount of driver memory 
required to combine the results is more than {{46 GB}}.

h2. Proposed Solution

This amount of memory required can be dramatically reduced by using 
{{[treeReduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1040]}}.
  For instance replacing the {{else}} clause with:

{code:language=scala}
else {
  import scala.math.{ceil, log, max}
  val depth = max(2, ceil(log(mapRDDs.partitions.length) / log(2)).toInt)

  mapRDDs.treeReduce(
(queue1, queue2) => queue1 ++= queue2,
depth
  ).toArray.sorted(ord)
}
{code}

This should require less than double the network communication but should scale 
to much larger values of the {{num}} parameter without configuration changes or 
beefier machines.

h2. Code Potentially Impacted

* ML Lib's 
{{[CountVectorizer|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala#L232]}}


> RDD.takeOrdered uses reduce, pulling all partition data to the driver
> -
>
> Key: SPARK-24587
> URL: 

[jira] [Updated] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2018-06-19 Thread Iqbal Singh (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Iqbal Singh updated SPARK-24295:

Priority: Major  (was: Minor)

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2018-06-19 Thread Iqbal Singh (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Iqbal Singh updated SPARK-24295:

Issue Type: Bug  (was: Wish)

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Minor
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517441#comment-16517441
 ] 

Wenbo Zhao commented on SPARK-24578:


[~irashid] Yes, that is exactly what I saw in our side.

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => println(i); 
> 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517440#comment-16517440
 ] 

Wenbo Zhao commented on SPARK-24578:


Hi [~attilapiros], I guess what you suggest is 
{code:java}
ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), 
Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT));{code}
Will report back how my test goes.

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Imran Rashid (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517439#comment-16517439
 ] 

Imran Rashid commented on SPARK-24578:
--

I think [~attilapiros] may be right -- can you send a PR to fix?

We were able to reproduce the issue with the code you gave, and noticed that 
the errors occur exactly at the two minute timeout.  The receiver closes the 
connection because of the timeout, so then the sender just has a generic 
failure that the connection was closed.

Sender:
{noformat}
18/06/18 10:12:08 ERROR server.TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=879251518000, 
chunkIndex=0}, 
buffer=org.apache.spark.storage.BlockManagerManagedBuffer@19b0a640} to 
/xx:38710; closing connection
java.io.IOException: Connection reset by peer
{noformat}

Receiver:
{noformat}
18/06/18 10:12:08 ERROR server.TransportChannelHandler: Connection to 
xxx.com/:38765 has been quiet for 12 ms while there are outstanding 
requests. Assuming connection is dead; please adjust spark.network.timeout if 
this is wrong.
18/06/18 10:12:08 ERROR client.TransportResponseHandler: Still have 1 requests 
outstanding when connection from x.com/:38765 is closed
18/06/18 10:12:08 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) for 1 
outstanding blocks after 5000 ms
{noformat}

At first I found it hard to believe this slow down would lead to the connection 
appearing totally quiet -- but i realized that there is actually a lot of 
concurrent activity going on, as all the executors are also sending and 
receiving many more blocks from each other. So if you're simultaneously dealing 
with a bunch of large blocks, and we put this big slow down inside the netty 
threads, this could lead to things appearing idle.

[~wbzhao] does this match the cases you see as well?

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Attila Zsolt Piros (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517416#comment-16517416
 ] 

Attila Zsolt Piros commented on SPARK-24578:


I have written a small test I know it is a bit naive but I still thing it shows 
us something:

[https://gist.github.com/attilapiros/730d67b62317d14f5fd0f6779adea245]

And the result is:
{noformat}
n = 500 
duration221 = 2 
duration221.new = 0 
duration230 = 5242 
duration230.new = 7
{noformat}
My guess the receiver timeouts and the sender is writing into a closed socket.

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> 

[jira] [Commented] (SPARK-650) Add a "setup hook" API for running initialization code on each executor

2018-06-19 Thread Avi minsky (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517398#comment-16517398
 ] 

Avi minsky commented on SPARK-650:
--

We encountered an issue with the combination of lazy static loading and 
speculation.
Because speculation kills tasks it might kill while loading lazy static classes 
which make them unusuable and later all application might fail for 
noclassdeferror

> Add a "setup hook" API for running initialization code on each executor
> ---
>
> Key: SPARK-650
> URL: https://issues.apache.org/jira/browse/SPARK-650
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Reporter: Matei Zaharia
>Priority: Minor
>
> Would be useful to configure things like reporting libraries



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24556) ReusedExchange should rewrite output partitioning also when child's partitioning is RangePartitioning

2018-06-19 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-24556.
-
   Resolution: Fixed
Fix Version/s: 2.4.0

Issue resolved by pull request 21564
[https://github.com/apache/spark/pull/21564]

> ReusedExchange should rewrite output partitioning also when child's 
> partitioning is RangePartitioning
> -
>
> Key: SPARK-24556
> URL: https://issues.apache.org/jira/browse/SPARK-24556
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.2
>Reporter: yucai
>Assignee: yucai
>Priority: Major
> Fix For: 2.4.0
>
>
> Currently, ReusedExchange would rewrite output partitioning if child's 
> partitioning is HashPartitioning, but it does not do the same when child's 
> partitioning is RangePartitioning, sometimes, it could introduce extra 
> shuffle, see:
> {code:java}
> val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j")
> val df1 = df.as("t1")
> val df2 = df.as("t2")
> val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right")
> t.cache.orderBy($"t2.j").explain
> {code}
> Before fix:
> {code:sql}
> == Physical Plan ==
> *(1) Sort [j#14 ASC NULLS FIRST], true, 0
> +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)
>+- InMemoryTableScan [i#5, j#6, i#13, j#14]
>  +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
>+- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
>   :- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, false] as...
>   :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
>   : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 
> 200)
>   :+- LocalTableScan [i#5, j#6]
>   +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
>  +- ReusedExchange [i#13, j#14], Exchange 
> rangepartitioning(j#6 ASC NULLS FIRST, 200)
> {code}
> Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, 
> 200)", like:
> {code:sql}
> == Physical Plan ==
> *(1) Sort [j#14 ASC NULLS FIRST], true, 0
> +- InMemoryTableScan [i#5, j#6, i#13, j#14]
>   +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
> +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
>:- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
>:  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
>: +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
>:+- LocalTableScan [i#5, j#6]
>+- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
>   +- ReusedExchange [i#13, j#14], Exchange 
> rangepartitioning(j#6 ASC NULLS FIRST, 200)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24556) ReusedExchange should rewrite output partitioning also when child's partitioning is RangePartitioning

2018-06-19 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-24556:
---

Assignee: yucai

> ReusedExchange should rewrite output partitioning also when child's 
> partitioning is RangePartitioning
> -
>
> Key: SPARK-24556
> URL: https://issues.apache.org/jira/browse/SPARK-24556
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.2
>Reporter: yucai
>Assignee: yucai
>Priority: Major
> Fix For: 2.4.0
>
>
> Currently, ReusedExchange would rewrite output partitioning if child's 
> partitioning is HashPartitioning, but it does not do the same when child's 
> partitioning is RangePartitioning, sometimes, it could introduce extra 
> shuffle, see:
> {code:java}
> val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j")
> val df1 = df.as("t1")
> val df2 = df.as("t2")
> val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right")
> t.cache.orderBy($"t2.j").explain
> {code}
> Before fix:
> {code:sql}
> == Physical Plan ==
> *(1) Sort [j#14 ASC NULLS FIRST], true, 0
> +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)
>+- InMemoryTableScan [i#5, j#6, i#13, j#14]
>  +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
>+- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
>   :- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, false] as...
>   :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
>   : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 
> 200)
>   :+- LocalTableScan [i#5, j#6]
>   +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
>  +- ReusedExchange [i#13, j#14], Exchange 
> rangepartitioning(j#6 ASC NULLS FIRST, 200)
> {code}
> Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, 
> 200)", like:
> {code:sql}
> == Physical Plan ==
> *(1) Sort [j#14 ASC NULLS FIRST], true, 0
> +- InMemoryTableScan [i#5, j#6, i#13, j#14]
>   +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
> +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
>:- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
>:  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
>: +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
>:+- LocalTableScan [i#5, j#6]
>+- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
>   +- ReusedExchange [i#13, j#14], Exchange 
> rangepartitioning(j#6 ASC NULLS FIRST, 200)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24521) Fix ineffective test in CachedTableSuite

2018-06-19 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-24521.
-
   Resolution: Fixed
Fix Version/s: 2.4.0
   2.3.2

> Fix ineffective test in CachedTableSuite
> 
>
> Key: SPARK-24521
> URL: https://issues.apache.org/jira/browse/SPARK-24521
> Project: Spark
>  Issue Type: Test
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Assignee: Li Jin
>Priority: Minor
> Fix For: 2.3.2, 2.4.0
>
>
> test("withColumn doesn't invalidate cached dataframe") in CachedTableSuite 
> doesn't not work because:
> The UDF is executed and test count incremented when "df.cache()" is called 
> and the subsequent "df.collect()" has no effect on the test result. 
> link:
> https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala#L86



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24521) Fix ineffective test in CachedTableSuite

2018-06-19 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li reassigned SPARK-24521:
---

Assignee: Li Jin

> Fix ineffective test in CachedTableSuite
> 
>
> Key: SPARK-24521
> URL: https://issues.apache.org/jira/browse/SPARK-24521
> Project: Spark
>  Issue Type: Test
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Assignee: Li Jin
>Priority: Minor
>
> test("withColumn doesn't invalidate cached dataframe") in CachedTableSuite 
> doesn't not work because:
> The UDF is executed and test count incremented when "df.cache()" is called 
> and the subsequent "df.collect()" has no effect on the test result. 
> link:
> https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala#L86



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23427) spark.sql.autoBroadcastJoinThreshold causing OOM exception in the driver

2018-06-19 Thread Dean Wampler (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517326#comment-16517326
 ] 

Dean Wampler commented on SPARK-23427:
--

Hi, Kazuaki. Any update on this issue? Any pointers on what you discovered? 
Thanks.

> spark.sql.autoBroadcastJoinThreshold causing OOM exception in the driver 
> -
>
> Key: SPARK-23427
> URL: https://issues.apache.org/jira/browse/SPARK-23427
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: SPARK 2.0 version
>Reporter: Dhiraj
>Priority: Critical
>
> We are facing issue around value of spark.sql.autoBroadcastJoinThreshold.
> With spark.sql.autoBroadcastJoinThreshold -1 ( disable) we seeing driver 
> memory used flat.
> With any other values 10MB, 5MB, 2 MB, 1MB, 10K, 1K we see driver memory used 
> goes up with rate depending upon the size of the autoBroadcastThreshold and 
> getting OOM exception. The problem is memory used by autoBroadcast is not 
> being free up in the driver.
> Application imports oracle tables as master dataframes which are persisted. 
> Each job applies filter to these tables and then registered them as 
> tempViewTable . Then sql query are using to process data further. At the end 
> all the intermediate dataFrame are unpersisted.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Attila Zsolt Piros (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517287#comment-16517287
 ] 

Attila Zsolt Piros commented on SPARK-24578:


The copyByteBuf() along with transferTo() is called several times on a huge 
byteBuf. If we have a really huge chunk from small chunks we are re-merging the 
small chunks again and again by 
[https://github.com/apache/spark/blob/v2.3.0/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java#L140|https://github.com/apache/spark/blob/v2.3.0/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java#L140.]
 although we need only a very small part of it.

Is it possible the following line would help?
{code:java}
ByteBuffer buffer = buf.nioBuffer(0, Math.min(buf.readableBytes(), 
NIO_BUFFER_LIMIT));{code}

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors 

[jira] [Comment Edited] (SPARK-24498) Add JDK compiler for runtime codegen

2018-06-19 Thread Takeshi Yamamuro (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516917#comment-16516917
 ] 

Takeshi Yamamuro edited comment on SPARK-24498 at 6/19/18 3:12 PM:
---

Probably, kiszk will make polite code for this (and make a pr) though, I 
quickly made prototype by myself for benchmarks in advance: 
[https://github.com/apache/spark/compare/master...maropu:JdkCompiler] (I 
checked all the queries in TPCDS, TPCH, and SSB could be compiled). I'll put 
TPCDS benchmark results and statistics (codegen size or something) soon.


was (Author: maropu):
Probably, kiszk will make polite code for this (and make a pr) though, I 
quickly made prototype by myself for benchmarks in advance: 
[https://github.com/apache/spark/compare/master...maropu:JdkCompiler] (I 
checked most queries in TPCDS could be compiled). I'll put TPCDS benchmark 
results and statistics (codegen size or something) soon.

> Add JDK compiler for runtime codegen
> 
>
> Key: SPARK-24498
> URL: https://issues.apache.org/jira/browse/SPARK-24498
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Priority: Major
>
> In some cases, JDK compiler can generate smaller bytecode and take less time 
> in compilation compared to Janino. However, in some cases, Janino is better. 
> We should support both for our runtime codegen. Janino will be still our 
> default runtime codegen compiler. 
> See the related JIRAs in DRILL: 
> - https://issues.apache.org/jira/browse/DRILL-1155
> - https://issues.apache.org/jira/browse/DRILL-4778
> - https://issues.apache.org/jira/browse/DRILL-5696



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24595) What about additional support on deeply nested column?

2018-06-19 Thread Zejun Li (JIRA)
Zejun Li created SPARK-24595:


 Summary: What about additional support on deeply nested column?
 Key: SPARK-24595
 URL: https://issues.apache.org/jira/browse/SPARK-24595
 Project: Spark
  Issue Type: Question
  Components: SQL
Affects Versions: 2.3.0
Reporter: Zejun Li


I store some trajectories data in parquet with this schema:
{code:java}
create table traj(
  id string,
  points array>
   >>
){code}
It contains a lots of attribute comes from sensors. It also have a nested array 
which contains information generated during map-matching algorithm.

 

All of my algorithm run on this dataset is trajectory-oriented, which means 
they often do iteration on points, and use a subset of point's attributes to do 
some computation. With this schema I can get points of trajectory without doing 
`group by` and `collect_list`.

 

Because Parquet works very well on deeply nested data, so I directly store it 
in parquet format with no flatten.

It works very well with Impala, because Impala has some special support on 
nested data:

 
{code:java}
select
  id,
  avg_speed
from 
  traj t,
  (select avg(speed) avg_speed from t.points where time < '2018-06-19'){code}
As you can see, Impala treat array of structs as a nested table, and can do 
some computation on array elements at pre-row level. And Impala will use 
Parquet's features to prune unused attributes in point struct.

 

I use Spark for some complex algorithm which cannot written in pure SQL. But I 
meet some trouble with Spark DataFrame API:

Spark cannot do schema prune and filter push-down on nested column. And it 
seems like there is no handy syntax to play with deeply nested data.
 * `explode` not help in my scenario, because I need to preserve the 
trajectory-points hierarchy. If I use `explode` here, I need do a extra `group 
by` on `id`.
 * Although, I can directly select `points.lat`, but it lost it structure. If I 
need array of (lat, lng) pair, I need to zip two array. And it cannot work at 
deeper nested level, such as select `points.candidate_road.score`. 
 * Maybe I can use parquet-mr package to read file as RDD, and pass read schema 
directly to it. But this manner lost Hive integration and vectorized reader in 
Spark.

 

So, I think it is nice to have a Impala style subquery syntax on complex data, 
or can we add some support to do schema projection on nested data like:

 
{code:java}
select id, extract(points, lat, lng, extract(candidate_road, score)) from 
traj{code}
which produce schema as:

 

 
{code:java}
|- id string
|- points array of struct
|- lat double
|- lng double
|- candidate_road array of struct
|- score double{code}
And user can play with points with desired schema and data prune in Parquet.

 

 

Or if there are some existing syntax to done my work?

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517180#comment-16517180
 ] 

Wenbo Zhao commented on SPARK-24578:


After digging more details, this commit  
[https://github.com/apache/spark/commit/16612638f0539f197eb7deb1be2ec53fed60d707#diff-a1f697fbd4610dff4eb8ff848e8cea2a]
 came to my attention. I reverted this commit in our Spark distribution and 
found out the issue went away.

I also examined the log to verify that performance of 
[https://github.com/apache/spark/blob/v2.3.0/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java#L142]
 is back to normal. It is able to transfer the 500MB data in 2s~3s. 

However, it is still not quite clear why that commit is causing the issue.

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and 

[jira] [Updated] (SPARK-24519) MapStatus has 2000 hardcoded

2018-06-19 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-24519:
--
Description: 
MapStatus uses hardcoded value of 2000 partitions to determine if it should use 
highly compressed map status. We should make it configurable to allow users to 
more easily tune their jobs with respect to this without having for them to 
modify their code to change the number of partitions.  Note we can leave this 
as an internal/undocumented config for now until we have more advise for the 
users on how to set this config.

Some of my reasoning:

The config gives you a way to easily change something without the user having 
to change code, redeploy jar, and then run again. You can simply change the 
config and rerun. It also allows for easier experimentation. Changing the # of 
partitions has other side affects, whether good or bad is situation dependent. 
It can be worse are you could be increasing # of output files when you don't 
want to be, affects the # of tasks needs and thus executors to run in parallel, 
etc.

There have been various talks about this number at spark summits where people 
have told customers to increase it to be 2001 partitions. Note if you just do a 
search for spark 2000 partitions you will fine various things all talking about 
this number.  This shows that people are modifying their code to take this into 
account so it seems to me having this configurable would be better.

Once we have more advice for users we could expose this and document 
information on it.

 

  was:MapStatus uses hardcoded value of 2000 partitions to determine if it 
should use highly compressed map status. We should make it configurable.


> MapStatus has 2000 hardcoded
> 
>
> Key: SPARK-24519
> URL: https://issues.apache.org/jira/browse/SPARK-24519
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Hieu Tri Huynh
>Priority: Minor
>
> MapStatus uses hardcoded value of 2000 partitions to determine if it should 
> use highly compressed map status. We should make it configurable to allow 
> users to more easily tune their jobs with respect to this without having for 
> them to modify their code to change the number of partitions.  Note we can 
> leave this as an internal/undocumented config for now until we have more 
> advise for the users on how to set this config.
> Some of my reasoning:
> The config gives you a way to easily change something without the user having 
> to change code, redeploy jar, and then run again. You can simply change the 
> config and rerun. It also allows for easier experimentation. Changing the # 
> of partitions has other side affects, whether good or bad is situation 
> dependent. It can be worse are you could be increasing # of output files when 
> you don't want to be, affects the # of tasks needs and thus executors to run 
> in parallel, etc.
> There have been various talks about this number at spark summits where people 
> have told customers to increase it to be 2001 partitions. Note if you just do 
> a search for spark 2000 partitions you will fine various things all talking 
> about this number.  This shows that people are modifying their code to take 
> this into account so it seems to me having this configurable would be better.
> Once we have more advice for users we could expose this and document 
> information on it.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24498) Add JDK compiler for runtime codegen

2018-06-19 Thread Kazuaki Ishizaki (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517097#comment-16517097
 ] 

Kazuaki Ishizaki commented on SPARK-24498:
--

[~maropu] thank you, let us use this as a start point.

> Add JDK compiler for runtime codegen
> 
>
> Key: SPARK-24498
> URL: https://issues.apache.org/jira/browse/SPARK-24498
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Priority: Major
>
> In some cases, JDK compiler can generate smaller bytecode and take less time 
> in compilation compared to Janino. However, in some cases, Janino is better. 
> We should support both for our runtime codegen. Janino will be still our 
> default runtime codegen compiler. 
> See the related JIRAs in DRILL: 
> - https://issues.apache.org/jira/browse/DRILL-1155
> - https://issues.apache.org/jira/browse/DRILL-4778
> - https://issues.apache.org/jira/browse/DRILL-5696



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24594) Introduce metrics for YARN executor allocation problems

2018-06-19 Thread Attila Zsolt Piros (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517028#comment-16517028
 ] 

Attila Zsolt Piros commented on SPARK-24594:


I am working on this.

> Introduce metrics for YARN executor allocation problems 
> 
>
> Key: SPARK-24594
> URL: https://issues.apache.org/jira/browse/SPARK-24594
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.4.0
>Reporter: Attila Zsolt Piros
>Priority: Major
>
> Within SPARK-16630 it come up to introduce metrics for  YARN executor 
> allocation problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24594) Introduce metrics for YARN executor allocation problems

2018-06-19 Thread Attila Zsolt Piros (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Attila Zsolt Piros updated SPARK-24594:
---
Description: Within SPARK-16630 it come up to introduce metrics for  YARN 
executor allocation problems.  (was: Within 
https://issues.apache.org/jira/browse/SPARK-16630 it come up to introduce 
metrics for  YARN executor allocation problems.)

> Introduce metrics for YARN executor allocation problems 
> 
>
> Key: SPARK-24594
> URL: https://issues.apache.org/jira/browse/SPARK-24594
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.4.0
>Reporter: Attila Zsolt Piros
>Priority: Major
>
> Within SPARK-16630 it come up to introduce metrics for  YARN executor 
> allocation problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24594) Introduce metrics for YARN executor allocation problems

2018-06-19 Thread Attila Zsolt Piros (JIRA)
Attila Zsolt Piros created SPARK-24594:
--

 Summary: Introduce metrics for YARN executor allocation problems 
 Key: SPARK-24594
 URL: https://issues.apache.org/jira/browse/SPARK-24594
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 2.4.0
Reporter: Attila Zsolt Piros


Within https://issues.apache.org/jira/browse/SPARK-16630 it come up to 
introduce metrics for  YARN executor allocation problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21063) Spark return an empty result from remote hadoop cluster

2018-06-19 Thread Paul Staab (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516990#comment-16516990
 ] 

Paul Staab edited comment on SPARK-21063 at 6/19/18 12:01 PM:
--

I was able to find a workaround for this problem on Spark 2.1.0:

 

1. Create an Hive Dialect which uses the correct quotes for escaping the column 
names:
{code:java}
object HiveDialect extends JdbcDialect {

  override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")

  override def quoteIdentifier(colName: String): String = s"`$colName`"
  }
}
{code}
This is taken from [https://github.com/apache/spark/pull/19238]

2. Register it before making the call with spark.read.jdbc
{code:java}
JdbcDialects.registerDialect(HiveDialect)
{code}
3. Execute spark.read.jdbc with fetchsize option
{code:java}
spark.read.jdbc("jdbc:hive2://localhost:1/default","test1",
  properties={"driver": "org.apache.hive.jdbc.HiveDriver", "fetchsize": 
"10"}).show()
{code}
It only works when both registering the dialect and using fetchsize. There was 
a merge request for adding the dialect to spark

[https://github.com/apache/spark/pull/19238]

but unfortunately it was not merged.


was (Author: paulstaab):
I was able to find a workaround for this problem on Spark 2.1.0:

 

1. Create an Hive Dialect which uses the correct quotes for escaping the column 
names:
{code:java}
object HiveDialect extends JdbcDialect {

  override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")

  override def quoteIdentifier(colName: String): String = s"`$colName`"
  }
}
{code}
This is taken from [https://github.com/apache/spark/pull/19238]

2. Register it before making the call with spark.read.jdbc
{code:java}
JdbcDialects.registerDialect(HiveDialect)
{code}
3. Execute spark.read.jdbc with fetchsize option
{code:java}
spark.read.jdbc("jdbc:hive2://localhost:1/default","test1",
  properties={"driver": "org.apache.hive.jdbc.HiveDriver", "fetchsize": 
"10"}).show()
{code}
It only works when registering the dialect and using fetchsize. There was a 
merge request for adding the dialect to spark

[https://github.com/apache/spark/pull/19238]

but unfortunately it was not merged.

> Spark return an empty result from remote hadoop cluster
> ---
>
> Key: SPARK-21063
> URL: https://issues.apache.org/jira/browse/SPARK-21063
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Peter Bykov
>Priority: Major
>
> Spark returning empty result from when querying remote hadoop cluster.
> All firewall settings removed.
> Querying using JDBC working properly using hive-jdbc driver from version 1.1.1
> Code snippet is:
> {code:java}
> val spark = SparkSession.builder
> .appName("RemoteSparkTest")
> .master("local")
> .getOrCreate()
> val df = spark.read
>   .option("url", "jdbc:hive2://remote.hive.local:1/default")
>   .option("user", "user")
>   .option("password", "pass")
>   .option("dbtable", "test_table")
>   .option("driver", "org.apache.hive.jdbc.HiveDriver")
>   .format("jdbc")
>   .load()
>  
> df.show()
> {code}
> Result:
> {noformat}
> +---+
> |test_table.test_col|
> +---+
> +---+
> {noformat}
> All manipulations like: 
> {code:java}
> df.select(*).show()
> {code}
> returns empty result too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21063) Spark return an empty result from remote hadoop cluster

2018-06-19 Thread Paul Staab (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516990#comment-16516990
 ] 

Paul Staab edited comment on SPARK-21063 at 6/19/18 11:59 AM:
--

I was able to find a workaround for this problem on Spark 2.1.0:

 

1. Create an Hive Dialect which uses the correct quotes for escaping the column 
names:
{code:java}
object HiveDialect extends JdbcDialect {

  override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")

  override def quoteIdentifier(colName: String): String = s"`$colName`"
  }
}
{code}
This is taken from [https://github.com/apache/spark/pull/19238]

2. Register it before making the call with spark.read.jdbc
{code:java}
JdbcDialects.registerDialect(HiveDialect)
{code}
3. Execute spark.read.jdbc with fetchsize option
{code:java}
spark.read.jdbc("jdbc:hive2://localhost:1/default","test1",
  properties={"driver": "org.apache.hive.jdbc.HiveDriver", "fetchsize": 
"10"}).show()
{code}
It only works when registering the dialect and using fetchsize. There was a 
merge request for adding the dialect to spark

[https://github.com/apache/spark/pull/19238]

but unfortunately it was not merged.


was (Author: paulstaab):
I was able to find a workaround for this problem on Spark 2.1.0:

 

1. Create an Hive Dialect which uses the correct quotes for escaping the column 
names:
{code:java}
object HiveDialect extends JdbcDialect {

  override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")

  override def quoteIdentifier(colName: String): String = s"`$colName`"
  }
}
{code}
This is taken from [https://github.com/apache/spark/pull/19238]

2. Register it before making the call with spark.read.jdbc
{code:java}
JdbcDialects.registerDialect(HiveDialect)
{code}
3. Execute spark.read.jdbc with fetchsize option
{code:java}
spark.read.jdbc("jdbc:hive2://localhost:1/default","test1",
  properties={"driver": "org.apache.hive.jdbc.HiveDriver", "fetchsize": 
"10"}).show()
{code}
It only works when registering the dialect and using fetchsize. There was a 
merge request for adding the dialect to spark by default

[https://github.com/apache/spark/pull/19238]

but unfortunately it was not merged.

> Spark return an empty result from remote hadoop cluster
> ---
>
> Key: SPARK-21063
> URL: https://issues.apache.org/jira/browse/SPARK-21063
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Peter Bykov
>Priority: Major
>
> Spark returning empty result from when querying remote hadoop cluster.
> All firewall settings removed.
> Querying using JDBC working properly using hive-jdbc driver from version 1.1.1
> Code snippet is:
> {code:java}
> val spark = SparkSession.builder
> .appName("RemoteSparkTest")
> .master("local")
> .getOrCreate()
> val df = spark.read
>   .option("url", "jdbc:hive2://remote.hive.local:1/default")
>   .option("user", "user")
>   .option("password", "pass")
>   .option("dbtable", "test_table")
>   .option("driver", "org.apache.hive.jdbc.HiveDriver")
>   .format("jdbc")
>   .load()
>  
> df.show()
> {code}
> Result:
> {noformat}
> +---+
> |test_table.test_col|
> +---+
> +---+
> {noformat}
> All manipulations like: 
> {code:java}
> df.select(*).show()
> {code}
> returns empty result too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21063) Spark return an empty result from remote hadoop cluster

2018-06-19 Thread Paul Staab (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516990#comment-16516990
 ] 

Paul Staab edited comment on SPARK-21063 at 6/19/18 11:59 AM:
--

I was able to find a workaround for this problem on Spark 2.1.0:

 

1. Create an Hive Dialect which uses the correct quotes for escaping the column 
names:
{code:java}
object HiveDialect extends JdbcDialect {

  override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")

  override def quoteIdentifier(colName: String): String = s"`$colName`"
  }
}
{code}
This is taken from [https://github.com/apache/spark/pull/19238]

2. Register it before making the call with spark.read.jdbc
{code:java}
JdbcDialects.registerDialect(HiveDialect)
{code}
3. Execute spark.read.jdbc with fetchsize option
{code:java}
spark.read.jdbc("jdbc:hive2://localhost:1/default","test1",
  properties={"driver": "org.apache.hive.jdbc.HiveDriver", "fetchsize": 
"10"}).show()
{code}
It only works when registering the dialect and using fetchsize. There was a 
merge request for adding the dialect to spark by default

[https://github.com/apache/spark/pull/19238]

but unfortunately it was not merged.


was (Author: paulstaab):
I was able to find a workaround for this problem on Spark 2.1.0:

 

1. Create an Hive Dialect which uses the correct quotes for escaping the column 
names:
{code:java}
object HiveDialect extends JdbcDialect with Logging {

  override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")

  override def quoteIdentifier(colName: String): String = s"`$colName`"
  }
}
{code}
This is taken from [https://github.com/apache/spark/pull/19238]

2. Register it before making the call with spark.read.jdbc
{code:java}
JdbcDialects.registerDialect(HiveDialect)
{code}
3. Execute spark.read.jdbc with fetchsize option
{code:java}
spark.read.jdbc("jdbc:hive2://localhost:1/default","test1",
  properties={"driver": "org.apache.hive.jdbc.HiveDriver", "fetchsize": 
"10"}).show()
{code}
It only works when registering the dialect and using fetchsize. There was a 
merge request for adding the dialect to spark by default

[https://github.com/apache/spark/pull/19238]

but unfortunately it was not merged.

> Spark return an empty result from remote hadoop cluster
> ---
>
> Key: SPARK-21063
> URL: https://issues.apache.org/jira/browse/SPARK-21063
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Peter Bykov
>Priority: Major
>
> Spark returning empty result from when querying remote hadoop cluster.
> All firewall settings removed.
> Querying using JDBC working properly using hive-jdbc driver from version 1.1.1
> Code snippet is:
> {code:java}
> val spark = SparkSession.builder
> .appName("RemoteSparkTest")
> .master("local")
> .getOrCreate()
> val df = spark.read
>   .option("url", "jdbc:hive2://remote.hive.local:1/default")
>   .option("user", "user")
>   .option("password", "pass")
>   .option("dbtable", "test_table")
>   .option("driver", "org.apache.hive.jdbc.HiveDriver")
>   .format("jdbc")
>   .load()
>  
> df.show()
> {code}
> Result:
> {noformat}
> +---+
> |test_table.test_col|
> +---+
> +---+
> {noformat}
> All manipulations like: 
> {code:java}
> df.select(*).show()
> {code}
> returns empty result too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21063) Spark return an empty result from remote hadoop cluster

2018-06-19 Thread Paul Staab (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516990#comment-16516990
 ] 

Paul Staab edited comment on SPARK-21063 at 6/19/18 11:58 AM:
--

I was able to find a workaround for this problem on Spark 2.1.0:

 

1. Create an Hive Dialect which uses the correct quotes for escaping the column 
names:
{code:java}
object HiveDialect extends JdbcDialect with Logging {

  override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")

  override def quoteIdentifier(colName: String): String = s"`$colName`"
  }
}
{code}
This is taken from [https://github.com/apache/spark/pull/19238]

2. Register it before making the call with spark.read.jdbc
{code:java}
JdbcDialects.registerDialect(HiveDialect)
{code}
3. Execute spark.read.jdbc with fetchsize option
{code:java}
spark.read.jdbc("jdbc:hive2://localhost:1/default","test1",
  properties={"driver": "org.apache.hive.jdbc.HiveDriver", "fetchsize": 
"10"}).show()
{code}
It only works when registering the dialect and using fetchsize. There was a 
merge request for adding the dialect to spark by default

[https://github.com/apache/spark/pull/19238]

but unfortunately it was not merged.


was (Author: paulstaab):
I was able to find a workaround for this problem on Spark 2.1.0:

 

1. Create an Hive Dialect which uses the correct quotes for escaping the column 
names:
{code:java}
object HiveDialect extends JdbcDialect with Logging {

  override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")

  override def quoteIdentifier(colName: String): String = s"`$colName`"
  }
}
{code}
This is taken from https://github.com/apache/spark/pull/19238

 

2. Register it before making the call with spark.read.jdbc
{code:java}
JdbcDialects.registerDialect(HiveDialect)
{code}
3. Execute spark.read.jdbc with fetchsize option
{code:java}
spark.read.jdbc("jdbc:hive2://localhost:1/default","test1",
  properties={"driver": "org.apache.hive.jdbc.HiveDriver", "fetchsize": 
"10"}).show()
{code}
It only works when registering the dialect and using fetchsize. There was a 
merge request for adding the dialect to spark by default

[https://github.com/apache/spark/pull/19238]

but unfortunately it was not merged.

> Spark return an empty result from remote hadoop cluster
> ---
>
> Key: SPARK-21063
> URL: https://issues.apache.org/jira/browse/SPARK-21063
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Peter Bykov
>Priority: Major
>
> Spark returning empty result from when querying remote hadoop cluster.
> All firewall settings removed.
> Querying using JDBC working properly using hive-jdbc driver from version 1.1.1
> Code snippet is:
> {code:java}
> val spark = SparkSession.builder
> .appName("RemoteSparkTest")
> .master("local")
> .getOrCreate()
> val df = spark.read
>   .option("url", "jdbc:hive2://remote.hive.local:1/default")
>   .option("user", "user")
>   .option("password", "pass")
>   .option("dbtable", "test_table")
>   .option("driver", "org.apache.hive.jdbc.HiveDriver")
>   .format("jdbc")
>   .load()
>  
> df.show()
> {code}
> Result:
> {noformat}
> +---+
> |test_table.test_col|
> +---+
> +---+
> {noformat}
> All manipulations like: 
> {code:java}
> df.select(*).show()
> {code}
> returns empty result too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21063) Spark return an empty result from remote hadoop cluster

2018-06-19 Thread Paul Staab (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516990#comment-16516990
 ] 

Paul Staab commented on SPARK-21063:


I was able to find a workaround for this problem on Spark 2.1.0:

 

1. Create an Hive Dialect which uses the correct quotes for escaping the column 
names:
{code:java}
object HiveDialect extends JdbcDialect with Logging {

  override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")

  override def quoteIdentifier(colName: String): String = s"`$colName`"
  }
}
{code}
This is taken from https://github.com/apache/spark/pull/19238

 

2. Register it before making the call with spark.read.jdbc
{code:java}
JdbcDialects.registerDialect(HiveDialect)
{code}
3. Execute spark.read.jdbc with fetchsize option
{code:java}
spark.read.jdbc("jdbc:hive2://localhost:1/default","test1",
  properties={"driver": "org.apache.hive.jdbc.HiveDriver", "fetchsize": 
"10"}).show()
{code}
It only works when registering the dialect and using fetchsize. There was a 
merge request for adding the dialect to spark by default

[https://github.com/apache/spark/pull/19238]

but unfortunately it was not merged.

> Spark return an empty result from remote hadoop cluster
> ---
>
> Key: SPARK-21063
> URL: https://issues.apache.org/jira/browse/SPARK-21063
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Peter Bykov
>Priority: Major
>
> Spark returning empty result from when querying remote hadoop cluster.
> All firewall settings removed.
> Querying using JDBC working properly using hive-jdbc driver from version 1.1.1
> Code snippet is:
> {code:java}
> val spark = SparkSession.builder
> .appName("RemoteSparkTest")
> .master("local")
> .getOrCreate()
> val df = spark.read
>   .option("url", "jdbc:hive2://remote.hive.local:1/default")
>   .option("user", "user")
>   .option("password", "pass")
>   .option("dbtable", "test_table")
>   .option("driver", "org.apache.hive.jdbc.HiveDriver")
>   .format("jdbc")
>   .load()
>  
> df.show()
> {code}
> Result:
> {noformat}
> +---+
> |test_table.test_col|
> +---+
> +---+
> {noformat}
> All manipulations like: 
> {code:java}
> df.select(*).show()
> {code}
> returns empty result too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24498) Add JDK compiler for runtime codegen

2018-06-19 Thread Takeshi Yamamuro (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516917#comment-16516917
 ] 

Takeshi Yamamuro commented on SPARK-24498:
--

Probably, kiszk will make polite code for this (and make a pr) though, I 
quickly made prototype by myself for benchmarks in advance: 
[https://github.com/apache/spark/compare/master...maropu:JdkCompiler] (I 
checked most queries in TPCDS could be compiled). I'll put TPCDS benchmark 
results and statistics (codegen size or something) soon.

> Add JDK compiler for runtime codegen
> 
>
> Key: SPARK-24498
> URL: https://issues.apache.org/jira/browse/SPARK-24498
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Priority: Major
>
> In some cases, JDK compiler can generate smaller bytecode and take less time 
> in compilation compared to Janino. However, in some cases, Janino is better. 
> We should support both for our runtime codegen. Janino will be still our 
> default runtime codegen compiler. 
> See the related JIRAs in DRILL: 
> - https://issues.apache.org/jira/browse/DRILL-1155
> - https://issues.apache.org/jira/browse/DRILL-4778
> - https://issues.apache.org/jira/browse/DRILL-5696



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24593) can not find hive table after spark streaming started

2018-06-19 Thread lhq (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lhq updated SPARK-24593:

Environment: 
{code:java}
// demo code
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.streaming;

import java.util.Arrays;
import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
* Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text 
received from the
* network every second.
*
* Usage: JavaSqlNetworkWordCount  
*  and  describe the TCP server that Spark Streaming would 
connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk `
* and then run the example
* `$ bin/run-example 
org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost `
*/
public final class JavaSqlNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaNetworkWordCount  ");
System.exit(1);
}

StreamingExamples.setStreamingLogLevels();

// Create the context with a 1 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaSqlNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(1));

// Create a JavaReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
JavaReceiverInputDStream lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream words = lines.flatMap(x -> 
Arrays.asList(SPACE.split(x)).iterator());

// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD((rdd, time) -> {
SparkSession spark = 
JavaSparkSessionSingleton.getInstance(rdd.context().getConf());

// Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
JavaRDD rowRDD = rdd.map(word -> {
JavaRecord record = new JavaRecord();
record.setWord(word);
return record;
});
Dataset wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class);

// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words");

// Do word count on table using SQL and print it
Dataset wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word");
System.out.println("= " + time + "=");
wordCountsDataFrame.show();

// access hive talbe failed
spark.sql("select * from test_fch");
});

ssc.start();
ssc.awaitTermination();
}
}

/** Lazily instantiated singleton instance of SparkSession */
class JavaSparkSessionSingleton {
private static transient SparkSession instance = null;
public static SparkSession getInstance(SparkConf sparkConf) {
if (instance == null) {
instance = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate();
}
return instance;
}
}

{code}

  was:
{code:java}
// demo code
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 

[jira] [Created] (SPARK-24593) can not find hive table after spark streaming started

2018-06-19 Thread lhq (JIRA)
lhq created SPARK-24593:
---

 Summary: can not find hive table after spark streaming started
 Key: SPARK-24593
 URL: https://issues.apache.org/jira/browse/SPARK-24593
 Project: Spark
  Issue Type: Bug
  Components: DStreams, SQL
Affects Versions: 2.3.1, 2.3.0
 Environment: {code:java}
// demo code
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.streaming;

import java.util.Arrays;
import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
* Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text 
received from the
* network every second.
*
* Usage: JavaSqlNetworkWordCount  
*  and  describe the TCP server that Spark Streaming would 
connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk `
* and then run the example
* `$ bin/run-example 
org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost `
*/
public final class JavaSqlNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaNetworkWordCount  ");
System.exit(1);
}

StreamingExamples.setStreamingLogLevels();

// Create the context with a 1 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaSqlNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(1));

// Create a JavaReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
JavaReceiverInputDStream lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream words = lines.flatMap(x -> 
Arrays.asList(SPACE.split(x)).iterator());

// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD((rdd, time) -> {
SparkSession spark = 
JavaSparkSessionSingleton.getInstance(rdd.context().getConf());

// Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
JavaRDD rowRDD = rdd.map(word -> {
JavaRecord record = new JavaRecord();
record.setWord(word);
return record;
});
Dataset wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class);

// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words");

// Do word count on table using SQL and print it
Dataset wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word");
System.out.println("= " + time + "=");
wordCountsDataFrame.show();
spark.sql("select * from test_fch");
});

ssc.start();
ssc.awaitTermination();
}
}

/** Lazily instantiated singleton instance of SparkSession */
class JavaSparkSessionSingleton {
private static transient SparkSession instance = null;
public static SparkSession getInstance(SparkConf sparkConf) {
if (instance == null) {
instance = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate();
}
return instance;
}
}

{code}
Reporter: lhq


org.apache.spark.sql.AnalysisException: Table or view not found: test_fch; line 
1 pos 14
 at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:47)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:665)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveRelation(Analyzer.scala:617)
 at 

[jira] [Created] (SPARK-24592) can not find hive table after spark streaming start

2018-06-19 Thread lhq (JIRA)
lhq created SPARK-24592:
---

 Summary: can not find hive table after spark streaming start
 Key: SPARK-24592
 URL: https://issues.apache.org/jira/browse/SPARK-24592
 Project: Spark
  Issue Type: Bug
  Components: DStreams, SQL
Affects Versions: 2.3.1, 2.3.0
 Environment:  
{code:java}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.streaming;

import java.util.Arrays;
import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
* Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text 
received from the
* network every second.
*
* Usage: JavaSqlNetworkWordCount  
*  and  describe the TCP server that Spark Streaming would 
connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk `
* and then run the example
* `$ bin/run-example 
org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost `
*/
public final class JavaSqlNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaNetworkWordCount  ");
System.exit(1);
}

StreamingExamples.setStreamingLogLevels();

// Create the context with a 1 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaSqlNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(1));

// Create a JavaReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
JavaReceiverInputDStream lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream words = lines.flatMap(x -> 
Arrays.asList(SPACE.split(x)).iterator());

// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD((rdd, time) -> {
SparkSession spark = 
JavaSparkSessionSingleton.getInstance(rdd.context().getConf());

// Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
JavaRDD rowRDD = rdd.map(word -> {
JavaRecord record = new JavaRecord();
record.setWord(word);
return record;
});
Dataset wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class);

// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words");

// Do word count on table using SQL and print it
Dataset wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word");
System.out.println("= " + time + "=");
wordCountsDataFrame.show();
spark.sql("select * from test_fch");
});

ssc.start();
ssc.awaitTermination();
}
}

/** Lazily instantiated singleton instance of SparkSession */
class JavaSparkSessionSingleton {
private static transient SparkSession instance = null;
public static SparkSession getInstance(SparkConf sparkConf) {
if (instance == null) {
instance = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate();
}
return instance;
}
}

{code}
 
Reporter: lhq


org.apache.spark.sql.AnalysisException: Table or view not found: test_fch; line 
1 pos 14
 at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:47)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:665)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveRelation(Analyzer.scala:617)
 at 

[jira] [Commented] (SPARK-24458) Invalid PythonUDF check_1(), requires attributes from more than one child

2018-06-19 Thread Ruben Berenguel (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516908#comment-16516908
 ] 

Ruben Berenguel commented on SPARK-24458:
-

That's what I thought [~AbdealiJK] (I had `1, 2, 3`) but all cases seem to work 
(sample for check_2, using a.csv with just the number 1).

I'm using a just-built Spark (pulled a couple days ago, so should be pretty 
close to 2.3.1, it is formally 2.4.0-SNAPSHOT)

 
{noformat}
>>> def check_2(a):
... return "PASS"
...
>>> testfunc = F.udf(T.DoubleType())(check_2)
>>> data = data.withColumn('check_out', testfunc(F.lit(0)))
>>> data.show()
+---+-+
|_c0|check_out|
+---+-+
| 1| null|
+---+-+
>>> testfunc = F.udf(T.StringType())(check_2)
>>> data = data.withColumn('check_out', testfunc(F.lit(0)))
>>> data.show()
+---+-+
|_c0|check_out|
+---+-+
| 1| PASS|
+---+-+
{noformat}
 

> Invalid PythonUDF check_1(), requires attributes from more than one child
> -
>
> Key: SPARK-24458
> URL: https://issues.apache.org/jira/browse/SPARK-24458
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0
> Environment: Spark 2.3.0 (local mode)
> Mac OSX
>Reporter: Abdeali Kothari
>Priority: Major
>
> I was trying out a very large query execution plan I have and I got the error:
>  
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o359.simpleString.
> : java.lang.RuntimeException: Invalid PythonUDF check_1(), requires 
> attributes from more than one child.
>  at scala.sys.package$.error(package.scala:27)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:182)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:181)
>  at scala.collection.immutable.Stream.foreach(Stream.scala:594)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:181)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:118)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>  at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:114)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:94)
>  at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
>  at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
>  at 
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>  at scala.collection.immutable.List.foldLeft(List.scala:84)
>  at 
> org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:87)
>  at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
>  at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
>  at 
> 

[jira] [Commented] (SPARK-24458) Invalid PythonUDF check_1(), requires attributes from more than one child

2018-06-19 Thread Abdeali Kothari (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516899#comment-16516899
 ] 

Abdeali Kothari commented on SPARK-24458:
-

I can be anything. I had a file with 1 column, 1 row with the value "1" in the 
cell :D

$ cat a.csv
1
$

> Invalid PythonUDF check_1(), requires attributes from more than one child
> -
>
> Key: SPARK-24458
> URL: https://issues.apache.org/jira/browse/SPARK-24458
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0
> Environment: Spark 2.3.0 (local mode)
> Mac OSX
>Reporter: Abdeali Kothari
>Priority: Major
>
> I was trying out a very large query execution plan I have and I got the error:
>  
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o359.simpleString.
> : java.lang.RuntimeException: Invalid PythonUDF check_1(), requires 
> attributes from more than one child.
>  at scala.sys.package$.error(package.scala:27)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:182)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:181)
>  at scala.collection.immutable.Stream.foreach(Stream.scala:594)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:181)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:118)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>  at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:114)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:94)
>  at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
>  at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
>  at 
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>  at scala.collection.immutable.List.foldLeft(List.scala:84)
>  at 
> org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:87)
>  at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
>  at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
>  at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:187)
>  at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:187)
>  at 
> org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:100)
>  at 
> org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:187)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> 

[jira] [Commented] (SPARK-24458) Invalid PythonUDF check_1(), requires attributes from more than one child

2018-06-19 Thread Ruben Berenguel (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516890#comment-16516890
 ] 

Ruben Berenguel commented on SPARK-24458:
-

[~AbdealiJK] what does your `a.csv` file contain in the sample above?

> Invalid PythonUDF check_1(), requires attributes from more than one child
> -
>
> Key: SPARK-24458
> URL: https://issues.apache.org/jira/browse/SPARK-24458
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0
> Environment: Spark 2.3.0 (local mode)
> Mac OSX
>Reporter: Abdeali Kothari
>Priority: Major
>
> I was trying out a very large query execution plan I have and I got the error:
>  
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o359.simpleString.
> : java.lang.RuntimeException: Invalid PythonUDF check_1(), requires 
> attributes from more than one child.
>  at scala.sys.package$.error(package.scala:27)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:182)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:181)
>  at scala.collection.immutable.Stream.foreach(Stream.scala:594)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:181)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:118)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>  at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:114)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:94)
>  at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
>  at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
>  at 
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>  at scala.collection.immutable.List.foldLeft(List.scala:84)
>  at 
> org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:87)
>  at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
>  at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
>  at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:187)
>  at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:187)
>  at 
> org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:100)
>  at 
> org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:187)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at 

[jira] [Comment Edited] (SPARK-5158) Allow for keytab-based HDFS security in Standalone mode

2018-06-19 Thread Jelmer Kuperus (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-5158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516856#comment-16516856
 ] 

Jelmer Kuperus edited comment on SPARK-5158 at 6/19/18 9:33 AM:


I ended up with the following workaround which at first glance seems to work

1. create a _.java.login.config_ file in the home directory of the spark with 
the following contents
{noformat}
com.sun.security.jgss.krb5.initiate {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  useTicketCache="true"
  ticketCache="/tmp/krb5cc_0"
  keyTab="/path/to/my.keytab"
  principal="u...@foo.com";
};{noformat}
2. put a krb5.conf file in /etc/krb5.conf

3. place your hadoop configuration in /etc/hadoop/conf and in `core-site.xml` 
set : 
 * fs.defaultFS to webhdfs://your_hostname:14000/webhdfs/v1
 * hadoop.security.authentication to kerberos
 * hadoop.security.authorization to true

4. make sure the hadoop config gets is on the classpath of spark. Eg the 
process should have something like this in it
{noformat}
-cp /etc/spark/:/usr/share/spark/jars/*:/etc/hadoop/conf/{noformat}
 

This configures a single principal for the enire spark process. If you want to 
change the default paths to the configuration files you can use
{noformat}
-Djava.security.krb5.conf=/etc/krb5.conf 
-Djava.security.auth.login.config=/path/to/jaas.conf{noformat}
 

 

 


was (Author: jelmer):
I ended up with the following workaround which at first glance seems to work

1. create a `.java.login.config` file in the home directory of the spark with 
the following contents
{noformat}
com.sun.security.jgss.krb5.initiate {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  useTicketCache="true"
  ticketCache="/tmp/krb5cc_0"
  keyTab="/path/to/my.keytab"
  principal="u...@foo.com";
};{noformat}
2. put a krb5.conf file in /etc/krb5.conf

3. place your hadoop configuration in /etc/hadoop/conf and in `core-site.xml` 
set : 
 * fs.defaultFS to webhdfs://your_hostname:14000/webhdfs/v1
 * hadoop.security.authentication to kerberos
 * hadoop.security.authorization to true

4. make sure the hadoop config gets is on the classpath of spark. Eg the 
process should have something like this in it
{noformat}
-cp /etc/spark/:/usr/share/spark/jars/*:/etc/hadoop/conf/{noformat}
 

This configures a single principal for the enire spark process. If you want to 
change the default paths to the configuration files you can use
{noformat}
-Djava.security.krb5.conf=/etc/krb5.conf 
-Djava.security.auth.login.config=/path/to/jaas.conf{noformat}
 

 

 

> Allow for keytab-based HDFS security in Standalone mode
> ---
>
> Key: SPARK-5158
> URL: https://issues.apache.org/jira/browse/SPARK-5158
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Reporter: Patrick Wendell
>Assignee: Matthew Cheah
>Priority: Critical
>
> There have been a handful of patches for allowing access to Kerberized HDFS 
> clusters in standalone mode. The main reason we haven't accepted these 
> patches have been that they rely on insecure distribution of token files from 
> the driver to the other components.
> As a simpler solution, I wonder if we should just provide a way to have the 
> Spark driver and executors independently log in and acquire credentials using 
> a keytab. This would work for users who have a dedicated, single-tenant, 
> Spark clusters (i.e. they are willing to have a keytab on every machine 
> running Spark for their application). It wouldn't address all possible 
> deployment scenarios, but if it's simple I think it's worth considering.
> This would also work for Spark streaming jobs, which often run on dedicated 
> hardware since they are long-running services.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-5158) Allow for keytab-based HDFS security in Standalone mode

2018-06-19 Thread Jelmer Kuperus (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-5158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516856#comment-16516856
 ] 

Jelmer Kuperus edited comment on SPARK-5158 at 6/19/18 9:32 AM:


I ended up with the following workaround which at first glance seems to work

1. create a `.java.login.config` file in the home directory of the spark with 
the following contents
{noformat}
com.sun.security.jgss.krb5.initiate {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  useTicketCache="true"
  ticketCache="/tmp/krb5cc_0"
  keyTab="/path/to/my.keytab"
  principal="u...@foo.com";
};{noformat}
2. put a krb5.conf file in /etc/krb5.conf

3. place your hadoop configuration in /etc/hadoop/conf and in `core-site.xml` 
set : 
 * fs.defaultFS to webhdfs://your_hostname:14000/webhdfs/v1
 * hadoop.security.authentication to kerberos
 * hadoop.security.authorization to true

4. make sure the hadoop config gets is on the classpath of spark. Eg the 
process should have something like this in it
{noformat}
-cp /etc/spark/:/usr/share/spark/jars/*:/etc/hadoop/conf/{noformat}
 

This configures a single principal for the enire spark process. If you want to 
change the default paths to the configuration files you can use
{noformat}
-Djava.security.krb5.conf=/etc/krb5.conf 
-Djava.security.auth.login.config=/path/to/jaas.conf{noformat}
 

 

 


was (Author: jelmer):
I ended up with the following workaround which at first glance seems to work

1. create a `.java.login.config` file in the home directory of the spark with 
the following contents


{noformat}
com.sun.security.jgss.krb5.initiate {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  useTicketCache="true"
  ticketCache="/tmp/krb5cc_0"
  keyTab="/path/to/my.keytab"
  principal="u...@foo.com";
};{noformat}

2. put a krb5.conf file in /etc/krb5.conf

3. place your hadoop configuration in /etc/hadoop/conf and in `core-site.xml` 
set : 
 * fs.defaultFS to webhdfs://your_hostname:14000/webhdfs/v1
 * hadoop.security.authentication to kerberos
 * hadoop.security.authorization to true

4. make sure the hadoop config gets is on the classpath of spark. Eg the 
process should have something like this in it
{noformat}
-cp /etc/spark/:/usr/share/spark/jars/*:/etc/hadoop/conf/{noformat}
 

 

> Allow for keytab-based HDFS security in Standalone mode
> ---
>
> Key: SPARK-5158
> URL: https://issues.apache.org/jira/browse/SPARK-5158
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Reporter: Patrick Wendell
>Assignee: Matthew Cheah
>Priority: Critical
>
> There have been a handful of patches for allowing access to Kerberized HDFS 
> clusters in standalone mode. The main reason we haven't accepted these 
> patches have been that they rely on insecure distribution of token files from 
> the driver to the other components.
> As a simpler solution, I wonder if we should just provide a way to have the 
> Spark driver and executors independently log in and acquire credentials using 
> a keytab. This would work for users who have a dedicated, single-tenant, 
> Spark clusters (i.e. they are willing to have a keytab on every machine 
> running Spark for their application). It wouldn't address all possible 
> deployment scenarios, but if it's simple I think it's worth considering.
> This would also work for Spark streaming jobs, which often run on dedicated 
> hardware since they are long-running services.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24467) VectorAssemblerEstimator

2018-06-19 Thread Nick Pentreath (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516861#comment-16516861
 ] 

Nick Pentreath commented on SPARK-24467:


One option is to do that same as we did for one hot encoder: we could create a 
new Estimator/Model pair, and deprecate the old one, for 2.4.0. Then for 3.0, 
we could remove the old one.

> VectorAssemblerEstimator
> 
>
> Key: SPARK-24467
> URL: https://issues.apache.org/jira/browse/SPARK-24467
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Affects Versions: 2.4.0
>Reporter: Joseph K. Bradley
>Priority: Major
>
> In [SPARK-22346], I believe I made a wrong API decision: I recommended added 
> `VectorSizeHint` instead of making `VectorAssembler` into an Estimator since 
> I thought the latter option would break most workflows.  However, I should 
> have proposed:
> * Add a Param to VectorAssembler for specifying the sizes of Vectors in the 
> inputCols.  This Param can be optional.  If not given, then VectorAssembler 
> will behave as it does now.  If given, then VectorAssembler can use that info 
> instead of figuring out the Vector sizes via metadata or examining Rows in 
> the data (though it could do consistency checks).
> * Add a VectorAssemblerEstimator which gets the Vector lengths from data and 
> produces a VectorAssembler with the vector lengths Param specified.
> This will not break existing workflows.  Migrating to 
> VectorAssemblerEstimator will be easier than adding VectorSizeHint since it 
> will not require users to manually input Vector lengths.
> Note: Even with this Estimator, VectorSizeHint might prove useful for other 
> things in the future which require vector length metadata, so we could 
> consider keeping it rather than deprecating it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5158) Allow for keytab-based HDFS security in Standalone mode

2018-06-19 Thread Jelmer Kuperus (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-5158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516856#comment-16516856
 ] 

Jelmer Kuperus commented on SPARK-5158:
---

I ended up with the following workaround which at first glance seems to work

1. create a `.java.login.config` file in the home directory of the spark with 
the following contents


{noformat}
com.sun.security.jgss.krb5.initiate {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  useTicketCache="true"
  ticketCache="/tmp/krb5cc_0"
  keyTab="/path/to/my.keytab"
  principal="u...@foo.com";
};{noformat}

2. put a krb5.conf file in /etc/krb5.conf

3. place your hadoop configuration in /etc/hadoop/conf and in `core-site.xml` 
set : 
 * fs.defaultFS to webhdfs://your_hostname:14000/webhdfs/v1
 * hadoop.security.authentication to kerberos
 * hadoop.security.authorization to true

4. make sure the hadoop config gets is on the classpath of spark. Eg the 
process should have something like this in it
{noformat}
-cp /etc/spark/:/usr/share/spark/jars/*:/etc/hadoop/conf/{noformat}
 

 

> Allow for keytab-based HDFS security in Standalone mode
> ---
>
> Key: SPARK-5158
> URL: https://issues.apache.org/jira/browse/SPARK-5158
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Reporter: Patrick Wendell
>Assignee: Matthew Cheah
>Priority: Critical
>
> There have been a handful of patches for allowing access to Kerberized HDFS 
> clusters in standalone mode. The main reason we haven't accepted these 
> patches have been that they rely on insecure distribution of token files from 
> the driver to the other components.
> As a simpler solution, I wonder if we should just provide a way to have the 
> Spark driver and executors independently log in and acquire credentials using 
> a keytab. This would work for users who have a dedicated, single-tenant, 
> Spark clusters (i.e. they are willing to have a keytab on every machine 
> running Spark for their application). It wouldn't address all possible 
> deployment scenarios, but if it's simple I think it's worth considering.
> This would also work for Spark streaming jobs, which often run on dedicated 
> hardware since they are long-running services.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24423) Add a new option `query` for JDBC sources

2018-06-19 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516732#comment-16516732
 ] 

Apache Spark commented on SPARK-24423:
--

User 'dilipbiswal' has created a pull request for this issue:
https://github.com/apache/spark/pull/21590

> Add a new option `query` for JDBC sources
> -
>
> Key: SPARK-24423
> URL: https://issues.apache.org/jira/browse/SPARK-24423
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Priority: Major
>
> Currently, our JDBC connector provides the option `dbtable` for users to 
> specify the to-be-loaded JDBC source table. 
> {code} 
>  val jdbcDf = spark.read
>    .format("jdbc")
>    .option("*dbtable*", "dbName.tableName")
>    .options(jdbcCredentials: Map)
>    .load()
> {code} 
>  Normally, users do not fetch the whole JDBC table due to the poor 
> performance/throughput of JDBC. Thus, they normally just fetch a small set of 
> tables. For advanced users, they can pass a subquery as the option.   
> {code} 
>  val query = """ (select * from tableName limit 10) as tmp """
>  val jdbcDf = spark.read
>    .format("jdbc")
>    .option("*dbtable*", query)
>    .options(jdbcCredentials: Map)
>    .load()
> {code} 
>  However, this is straightforward to end users. We should simply allow users 
> to specify the query by a new option `query`. We will handle the complexity 
> for them. 
> {code} 
>  val query = """select * from tableName limit 10"""
>  val jdbcDf = spark.read
>    .format("jdbc")
>    .option("*{color:#ff}query{color}*", query)
>    .options(jdbcCredentials: Map)
>    .load()
> {code} 
>  Users are not allowed to specify query and dbtable at the same time. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24423) Add a new option `query` for JDBC sources

2018-06-19 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24423:


Assignee: Apache Spark

> Add a new option `query` for JDBC sources
> -
>
> Key: SPARK-24423
> URL: https://issues.apache.org/jira/browse/SPARK-24423
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Assignee: Apache Spark
>Priority: Major
>
> Currently, our JDBC connector provides the option `dbtable` for users to 
> specify the to-be-loaded JDBC source table. 
> {code} 
>  val jdbcDf = spark.read
>    .format("jdbc")
>    .option("*dbtable*", "dbName.tableName")
>    .options(jdbcCredentials: Map)
>    .load()
> {code} 
>  Normally, users do not fetch the whole JDBC table due to the poor 
> performance/throughput of JDBC. Thus, they normally just fetch a small set of 
> tables. For advanced users, they can pass a subquery as the option.   
> {code} 
>  val query = """ (select * from tableName limit 10) as tmp """
>  val jdbcDf = spark.read
>    .format("jdbc")
>    .option("*dbtable*", query)
>    .options(jdbcCredentials: Map)
>    .load()
> {code} 
>  However, this is straightforward to end users. We should simply allow users 
> to specify the query by a new option `query`. We will handle the complexity 
> for them. 
> {code} 
>  val query = """select * from tableName limit 10"""
>  val jdbcDf = spark.read
>    .format("jdbc")
>    .option("*{color:#ff}query{color}*", query)
>    .options(jdbcCredentials: Map)
>    .load()
> {code} 
>  Users are not allowed to specify query and dbtable at the same time. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24423) Add a new option `query` for JDBC sources

2018-06-19 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24423:


Assignee: (was: Apache Spark)

> Add a new option `query` for JDBC sources
> -
>
> Key: SPARK-24423
> URL: https://issues.apache.org/jira/browse/SPARK-24423
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Priority: Major
>
> Currently, our JDBC connector provides the option `dbtable` for users to 
> specify the to-be-loaded JDBC source table. 
> {code} 
>  val jdbcDf = spark.read
>    .format("jdbc")
>    .option("*dbtable*", "dbName.tableName")
>    .options(jdbcCredentials: Map)
>    .load()
> {code} 
>  Normally, users do not fetch the whole JDBC table due to the poor 
> performance/throughput of JDBC. Thus, they normally just fetch a small set of 
> tables. For advanced users, they can pass a subquery as the option.   
> {code} 
>  val query = """ (select * from tableName limit 10) as tmp """
>  val jdbcDf = spark.read
>    .format("jdbc")
>    .option("*dbtable*", query)
>    .options(jdbcCredentials: Map)
>    .load()
> {code} 
>  However, this is straightforward to end users. We should simply allow users 
> to specify the query by a new option `query`. We will handle the complexity 
> for them. 
> {code} 
>  val query = """select * from tableName limit 10"""
>  val jdbcDf = spark.read
>    .format("jdbc")
>    .option("*{color:#ff}query{color}*", query)
>    .options(jdbcCredentials: Map)
>    .load()
> {code} 
>  Users are not allowed to specify query and dbtable at the same time. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org