[jira] [Commented] (SPARK-26713) PipedRDD may holds stdin writer and stdout read threads even if the task is finished

2019-01-23 Thread Xianjin YE (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750836#comment-16750836
 ] 

Xianjin YE commented on SPARK-26713:


I have fixed and tested this issue in our internal cluster, will submit a PR 
soon.

> PipedRDD may holds stdin writer and stdout read threads even if the task is 
> finished
> 
>
> Key: SPARK-26713
> URL: https://issues.apache.org/jira/browse/SPARK-26713
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.3, 2.2.3, 2.3.0, 2.3.1, 2.3.2, 2.4.0
>Reporter: Xianjin YE
>Priority: Major
>
> During an investigation of OOM of one internal production job, I found that 
> PipedRDD leaks memory. After some digging, the problem lies down to the fact 
> that PipedRDD doesn't release stdin writer and stdout threads even if the 
> task is finished.
>  
> PipedRDD creates two threads: stdin writer and stdout reader. If we are lucky 
> and the task is finished normally, these two threads exit normally. If the 
> subprocess(pipe command) is failed, the task will be marked failed, however 
> the stdin writer will be still running until it consumes its parent RDD's 
> iterator. There is even a race condition with ShuffledRDD + PipedRDD: the 
> ShuffleBlockFetchIterator is cleaned up at task completion and hangs stdin 
> writer thread, which leaks memory. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26713) PipedRDD may holds stdin writer and stdout read threads even if the task is finished

2019-01-23 Thread Xianjin YE (JIRA)
Xianjin YE created SPARK-26713:
--

 Summary: PipedRDD may holds stdin writer and stdout read threads 
even if the task is finished
 Key: SPARK-26713
 URL: https://issues.apache.org/jira/browse/SPARK-26713
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0, 2.3.2, 2.3.1, 2.3.0, 2.2.3, 2.1.3
Reporter: Xianjin YE


During an investigation of OOM of one internal production job, I found that 
PipedRDD leaks memory. After some digging, the problem lies down to the fact 
that PipedRDD doesn't release stdin writer and stdout threads even if the task 
is finished.

 

PipedRDD creates two threads: stdin writer and stdout reader. If we are lucky 
and the task is finished normally, these two threads exit normally. If the 
subprocess(pipe command) is failed, the task will be marked failed, however the 
stdin writer will be still running until it consumes its parent RDD's iterator. 
There is even a race condition with ShuffledRDD + PipedRDD: the 
ShuffleBlockFetchIterator is cleaned up at task completion and hangs stdin 
writer thread, which leaks memory. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26699) Dataset column output discrepancies

2019-01-23 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750761#comment-16750761
 ] 

Hyukjin Kwon commented on SPARK-26699:
--

WrappedArray is a {{Seq}} anyway. So shouldn't be a big deal.

> Dataset column output discrepancies 
> 
>
> Key: SPARK-26699
> URL: https://issues.apache.org/jira/browse/SPARK-26699
> Project: Spark
>  Issue Type: Question
>  Components: Input/Output
>Affects Versions: 2.3.2
>Reporter: Praveena
>Priority: Major
>
> Hi,
>  
> When i run my job in Local mode (meaning as standalone in Eclipse) with same 
> parquet input files, the output is -
>  
> locations
>  
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  null
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  
> But when i run the same code base with same input parquet files in the YARN 
> cluster mode, my output is as below -
> 
>  locations
>  
>  [*WrappedArray*([tr...
>  [*WrappedArray*([tr...
>  [WrappedArray([tr...
>  null
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
> Its appending WrappedArray :(
> I am using Apache Spark 2.3.2 version and the EMR Version is 5.19.0. What 
> could be the reason for discrepancies in the output of certain Table columns ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26699) Dataset column output discrepancies

2019-01-23 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-26699.
--
Resolution: Invalid

Questions should go to mailing list. You could have a better answer from there.

> Dataset column output discrepancies 
> 
>
> Key: SPARK-26699
> URL: https://issues.apache.org/jira/browse/SPARK-26699
> Project: Spark
>  Issue Type: Question
>  Components: Input/Output
>Affects Versions: 2.3.2
>Reporter: Praveena
>Priority: Major
>
> Hi,
>  
> When i run my job in Local mode (meaning as standalone in Eclipse) with same 
> parquet input files, the output is -
>  
> locations
>  
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  null
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  
> But when i run the same code base with same input parquet files in the YARN 
> cluster mode, my output is as below -
> 
>  locations
>  
>  [*WrappedArray*([tr...
>  [*WrappedArray*([tr...
>  [WrappedArray([tr...
>  null
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
> Its appending WrappedArray :(
> I am using Apache Spark 2.3.2 version and the EMR Version is 5.19.0. What 
> could be the reason for discrepancies in the output of certain Table columns ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24615) Accelerator-aware task scheduling for Spark

2019-01-23 Thread Felix Cheung (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750751#comment-16750751
 ] 

Felix Cheung commented on SPARK-24615:
--

We are interested to know as well.

 

[~mengxr] touched on this maybe Oct/Nov 2018, but I haven't heard anything else 
since.

> Accelerator-aware task scheduling for Spark
> ---
>
> Key: SPARK-24615
> URL: https://issues.apache.org/jira/browse/SPARK-24615
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Saisai Shao
>Priority: Major
>  Labels: Hydrogen, SPIP
>
> In the machine learning area, accelerator card (GPU, FPGA, TPU) is 
> predominant compared to CPUs. To make the current Spark architecture to work 
> with accelerator cards, Spark itself should understand the existence of 
> accelerators and know how to schedule task onto the executors where 
> accelerators are equipped.
> Current Spark’s scheduler schedules tasks based on the locality of the data 
> plus the available of CPUs. This will introduce some problems when scheduling 
> tasks with accelerators required.
>  # CPU cores are usually more than accelerators on one node, using CPU cores 
> to schedule accelerator required tasks will introduce the mismatch.
>  # In one cluster, we always assume that CPU is equipped in each node, but 
> this is not true of accelerator cards.
>  # The existence of heterogeneous tasks (accelerator required or not) 
> requires scheduler to schedule tasks with a smart way.
> So here propose to improve the current scheduler to support heterogeneous 
> tasks (accelerator requires or not). This can be part of the work of Project 
> hydrogen.
> Details is attached in google doc. It doesn't cover all the implementation 
> details, just highlight the parts should be changed.
>  
> CC [~yanboliang] [~merlintang]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26701) spark thrift server driver memory leak

2019-01-23 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750759#comment-16750759
 ] 

Hyukjin Kwon commented on SPARK-26701:
--

Please include "memory analysis" and reproducible steps.

> spark thrift server driver memory leak
> --
>
> Key: SPARK-26701
> URL: https://issues.apache.org/jira/browse/SPARK-26701
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.2.1, 2.2.2, 2.2.3, 2.3.0, 2.3.1, 2.3.2, 2.4.0
>Reporter: wangdabin
>Priority: Major
>
> When using the spark thrift server, the driver memory is getting bigger and 
> bigger, and finally the memory overflows. The memory analysis results show 
> that the SparkSQLOpeartionManager handleToOperation object is not released, 
> resulting in memory leaks, and the final result is service downtime.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26701) spark thrift server driver memory leak

2019-01-23 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-26701:
-
Priority: Major  (was: Blocker)

> spark thrift server driver memory leak
> --
>
> Key: SPARK-26701
> URL: https://issues.apache.org/jira/browse/SPARK-26701
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.2.1, 2.2.2, 2.2.3, 2.3.0, 2.3.1, 2.3.2, 2.4.0
>Reporter: wangdabin
>Priority: Major
>
> When using the spark thrift server, the driver memory is getting bigger and 
> bigger, and finally the memory overflows. The memory analysis results show 
> that the SparkSQLOpeartionManager handleToOperation object is not released, 
> resulting in memory leaks, and the final result is service downtime.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26703) Hive record writer will always depends on parquet-1.6 writer should fix it

2019-01-23 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750756#comment-16750756
 ] 

Hyukjin Kwon commented on SPARK-26703:
--

To do this, it should upgrade Hive rather than switching Parquet.

> Hive record writer will always depends on parquet-1.6 writer should fix it 
> ---
>
> Key: SPARK-26703
> URL: https://issues.apache.org/jira/browse/SPARK-26703
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.2, 2.4.0
>Reporter: zhoukang
>Priority: Major
>
> Currently, when we are using insert into hive table related command.
> The parquet file generated will always be version 1.6,reason is below:
> 1. we rely on hive-exec HiveFileFormatUtils to get recordWriter
> {code:java}
> private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter(
> jobConf,
> tableDesc,
> serializer.getSerializedClass,
> fileSinkConf,
> new Path(path),
> Reporter.NULL)
> {code}
> 2. we will call 
> {code:java}
> public static RecordWriter getHiveRecordWriter(JobConf jc,
>   TableDesc tableInfo, Class outputClass,
>   FileSinkDesc conf, Path outPath, Reporter reporter) throws 
> HiveException {
> HiveOutputFormat hiveOutputFormat = getHiveOutputFormat(jc, 
> tableInfo);
> try {
>   boolean isCompressed = conf.getCompressed();
>   JobConf jc_output = jc;
>   if (isCompressed) {
> jc_output = new JobConf(jc);
> String codecStr = conf.getCompressCodec();
> if (codecStr != null && !codecStr.trim().equals("")) {
>   Class codec = 
>   (Class) 
> JavaUtils.loadClass(codecStr);
>   FileOutputFormat.setOutputCompressorClass(jc_output, codec);
> }
> String type = conf.getCompressType();
> if (type != null && !type.trim().equals("")) {
>   CompressionType style = CompressionType.valueOf(type);
>   SequenceFileOutputFormat.setOutputCompressionType(jc, style);
> }
>   }
>   return getRecordWriter(jc_output, hiveOutputFormat, outputClass,
>   isCompressed, tableInfo.getProperties(), outPath, reporter);
> } catch (Exception e) {
>   throw new HiveException(e);
> }
>   }
>   public static RecordWriter getRecordWriter(JobConf jc,
>   OutputFormat outputFormat,
>   Class valueClass, boolean isCompressed,
>   Properties tableProp, Path outPath, Reporter reporter
>   ) throws IOException, HiveException {
> if (!(outputFormat instanceof HiveOutputFormat)) {
>   outputFormat = new HivePassThroughOutputFormat(outputFormat);
> }
> return ((HiveOutputFormat)outputFormat).getHiveRecordWriter(
> jc, outPath, valueClass, isCompressed, tableProp, reporter);
>   }
> {code}
> 3. then in MapredParquetOutPutFormat
> {code:java}
> public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter 
> getHiveRecordWriter(
>   final JobConf jobConf,
>   final Path finalOutPath,
>   final Class valueClass,
>   final boolean isCompressed,
>   final Properties tableProperties,
>   final Progressable progress) throws IOException {
> LOG.info("creating new record writer..." + this);
> final String columnNameProperty = 
> tableProperties.getProperty(IOConstants.COLUMNS);
> final String columnTypeProperty = 
> tableProperties.getProperty(IOConstants.COLUMNS_TYPES);
> List columnNames;
> List columnTypes;
> if (columnNameProperty.length() == 0) {
>   columnNames = new ArrayList();
> } else {
>   columnNames = Arrays.asList(columnNameProperty.split(","));
> }
> if (columnTypeProperty.length() == 0) {
>   columnTypes = new ArrayList();
> } else {
>   columnTypes = 
> TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
> }
> 
> DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, 
> columnTypes), jobConf);
> return getParquerRecordWriterWrapper(realOutputFormat, jobConf, 
> finalOutPath.toString(),
> progress,tableProperties);
>   }
> {code}
> 4. then call 
> {code:java}
> public ParquetRecordWriterWrapper(
>   final OutputFormat realOutputFormat,
>   final JobConf jobConf,
>   final String name,
>   final Progressable progress, Properties tableProperties) throws
>   IOException {
> try {
>   // create a TaskInputOutputContext
>   TaskAttemptID taskAttemptID = 
> TaskAttemptID.forName(jobConf.get("mapred.task.id"));
>   if (taskAttemptID == null) {
> taskAttemptID = new TaskAttemptID();
>   }
>   taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);
>   LOG.info("initialize serde with table properties.");
>   initializeSerProperties(taskContext, 

[jira] [Updated] (SPARK-26705) UnsafeHashedRelation changed after broadcasted

2019-01-23 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-26705:
-
Priority: Major  (was: Critical)

> UnsafeHashedRelation changed after broadcasted
> --
>
> Key: SPARK-26705
> URL: https://issues.apache.org/jira/browse/SPARK-26705
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
> Environment: spark:2.2.1
> jdk:
> java version "1.8.0_112"
> Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
> Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
>Reporter: Lijia Liu
>Priority: Major
>
> When Spark SQL execute a broadcast join, it will convert small table into 
> UnsafeHashedRelation. But, in our cluster, I find that the 
> UnsafeHashedRelation may changed after been broadcasted.
> The broadcasting process is divided into the following steps:
> 1. Collect the small table data to Array[InternalRow]. 
> (BroadcastExchangeExec.scala)
> 2. Transform the Array[InternalRow] to UnsafeHashedRelation object, 
> UnsafeHashedRelation use a BytesToBytesMap to store 
> data.(BroadcastExchangeExec.scala)
> 3. Use UnsafeHashedRelation.write function to serialize the object to byte 
> stream.(HashedRelation.scala)
> 4. Use UnsafeHashedRelation.read function to deserialize the byte stream to 
> UnsafeHashedRelation object.(HashedRelation.scala)
> I found that sometimes the BytesToBytesMap in UnsafeHashedRelation after 
> deserialization is different from the one in UnsafeHashedRelation before 
> serialization. For example, the number of keys become smaller.
> In our cluster, I will judge the BytesToBytesMap' key size and value size in 
> UnsafeHashedRelation.read function to prevent incorrect result. But, I didn't 
> found the real reason for this bug.
> The job witch occurred error has some peculiarities:
> 1. the broadcasted table is very big.
> 2. It's not a stable recurrence.
> 3. There are many tables has being broadcasted in the job.
> 4. Driver's memory pressure is high.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26709) OptimizeMetadataOnlyQuery does not correctly handle the files with zero record

2019-01-23 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li reassigned SPARK-26709:
---

Assignee: Gengliang Wang  (was: Xiao Li)

> OptimizeMetadataOnlyQuery does not correctly handle the files with zero record
> --
>
> Key: SPARK-26709
> URL: https://issues.apache.org/jira/browse/SPARK-26709
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.3, 2.2.3, 2.3.2, 2.4.0
>Reporter: Xiao Li
>Assignee: Gengliang Wang
>Priority: Blocker
>  Labels: correctness
>
> {code:java}
> import org.apache.spark.sql.functions.lit
> withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
>   withTempPath { path =>
> val tabLocation = path.getAbsolutePath
> val partLocation = new Path(path.getAbsolutePath, "partCol1=3")
> val df = spark.emptyDataFrame.select(lit(1).as("col1"))
> df.write.parquet(partLocation.toString)
> val readDF = spark.read.parquet(tabLocation)
> checkAnswer(readDF.selectExpr("max(partCol1)"), Row(null))
> checkAnswer(readDF.selectExpr("max(col1)"), Row(null))
>   }
> }
> {code}
> OptimizeMetadataOnlyQuery has a correctness bug to handle the file with the 
> empty records for partitioned tables. The above test will fail in 2.4, which 
> can generate an empty file, but the underlying issue in the read path still 
> exists in 2.3, 2.2 and 2.1. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26711) JSON Schema inference takes 15 times longer

2019-01-23 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750748#comment-16750748
 ] 

Hyukjin Kwon commented on SPARK-26711:
--

Hm, the results say something is wrong hm. 50 sec <> 7 mins sounds serious. 

> JSON Schema inference takes 15 times longer
> ---
>
> Key: SPARK-26711
> URL: https://issues.apache.org/jira/browse/SPARK-26711
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Bruce Robbins
>Priority: Major
>
> I noticed that the first benchmark/case of JSONBenchmark ("JSON schema 
> inferring", "No encoding") was taking an hour to run, when it used to run in 
> 4-5 minutes.
> The culprit seems to be this commit: 
> [https://github.com/apache/spark/commit/d72571e51d]
> A quick look using a profiler, and it seems to be spending 99% of its time 
> doing some kind of exception handling in JsonInferSchema.scala.
> You can reproduce in the spark-shell by recreating the data used by the 
> benchmark
> {noformat}
> scala> :paste
> val rowsNum = 100 * 1000 * 1000
> spark.sparkContext.range(0, rowsNum, 1)
> .map(_ => "a")
> .toDF("fieldA")
> .write
> .option("encoding", "UTF-8")
> .json("utf8.json")
> // Entering paste mode (ctrl-D to finish)
> // Exiting paste mode, now interpreting.
> rowsNum: Int = 1
> scala> 
> {noformat}
> Then you can run the test by hand starting spark-shell as so (emulating 
> SqlBasedBenchmark):
> {noformat}
>  bin/spark-shell --driver-memory 8g \
>   --conf "spark.sql.autoBroadcastJoinThreshold=1" \
>   --conf "spark.sql.shuffle.partitions=1" --master "local[1]"
> {noformat}
> On commit d72571e51d:
> {noformat}
> scala> val start = System.currentTimeMillis; spark.read.json("utf8.json");  
> System.currentTimeMillis-start
> start: Long = 1548297682225
> res0: Long = 815978 <== 13.6 minutes
> scala>
> {noformat}
> On the previous commit (86100df54b):
> {noformat}
> scala> val start = System.currentTimeMillis; spark.read.json("utf8.json");  
> System.currentTimeMillis-start
> start: Long = 1548298927151
> res0: Long = 50087 <= 50 seconds
> scala> 
> {noformat}
> I also tried {{spark.read.option("inferTimestamp", 
> false).json("utf8.json")}}, but that option didn't seem to make a difference 
> in run time. Edit: {{inferTimestamp}} does, in fact, have an impact: It 
> halves the run time. However, that means even with {{inferTimestamp}}, the 
> run time is still 7 times slower than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread DB Tsai (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai resolved SPARK-26706.
-
Resolution: Resolved

> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.3, 2.0.2, 2.1.3, 2.2.3, 2.3.2, 2.4.0
>Reporter: Anton Okolnychyi
>Assignee: Anton Okolnychyi
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.3.3, 2.4.1
>
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.
> {code}
> // executes silently even though Long is converted into Byte
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
>   .map(b => b - 1)
>   .show()
> +-+
> |value|
> +-+
> |  -12|
> |  -11|
> |  -10|
> |   -9|
> |   -8|
> |   -7|
> |   -6|
> |   -5|
> |   -4|
> |   -3|
> +-+
> // throws an AnalysisException: Cannot up cast `id` from bigint to smallint 
> as it may truncate
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
>   .map(s => s - 1)
>   .show()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26711) JSON Schema inference takes 15 times longer

2019-01-23 Thread Bruce Robbins (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750741#comment-16750741
 ] 

Bruce Robbins commented on SPARK-26711:
---

[~hyukjin.kwon]
 inferTimestamp=: ~13 min
 inferTimestamp=false: ~7 min

7 minutes is a lot better than 13 minutes, but still not as good as 50 seconds.

A quick look in the profiler shows that in the case where inferTimestamp is 
_disabled_, Spark is spending 96% of its time here:
{code:java}
val bigDecimal = decimalParser(field)
{code}
That line did change in the original commit.

> JSON Schema inference takes 15 times longer
> ---
>
> Key: SPARK-26711
> URL: https://issues.apache.org/jira/browse/SPARK-26711
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Bruce Robbins
>Priority: Major
>
> I noticed that the first benchmark/case of JSONBenchmark ("JSON schema 
> inferring", "No encoding") was taking an hour to run, when it used to run in 
> 4-5 minutes.
> The culprit seems to be this commit: 
> [https://github.com/apache/spark/commit/d72571e51d]
> A quick look using a profiler, and it seems to be spending 99% of its time 
> doing some kind of exception handling in JsonInferSchema.scala.
> You can reproduce in the spark-shell by recreating the data used by the 
> benchmark
> {noformat}
> scala> :paste
> val rowsNum = 100 * 1000 * 1000
> spark.sparkContext.range(0, rowsNum, 1)
> .map(_ => "a")
> .toDF("fieldA")
> .write
> .option("encoding", "UTF-8")
> .json("utf8.json")
> // Entering paste mode (ctrl-D to finish)
> // Exiting paste mode, now interpreting.
> rowsNum: Int = 1
> scala> 
> {noformat}
> Then you can run the test by hand starting spark-shell as so (emulating 
> SqlBasedBenchmark):
> {noformat}
>  bin/spark-shell --driver-memory 8g \
>   --conf "spark.sql.autoBroadcastJoinThreshold=1" \
>   --conf "spark.sql.shuffle.partitions=1" --master "local[1]"
> {noformat}
> On commit d72571e51d:
> {noformat}
> scala> val start = System.currentTimeMillis; spark.read.json("utf8.json");  
> System.currentTimeMillis-start
> start: Long = 1548297682225
> res0: Long = 815978 <== 13.6 minutes
> scala>
> {noformat}
> On the previous commit (86100df54b):
> {noformat}
> scala> val start = System.currentTimeMillis; spark.read.json("utf8.json");  
> System.currentTimeMillis-start
> start: Long = 1548298927151
> res0: Long = 50087 <= 50 seconds
> scala> 
> {noformat}
> I also tried {{spark.read.option("inferTimestamp", 
> false).json("utf8.json")}}, but that option didn't seem to make a difference 
> in run time. Edit: {{inferTimestamp}} does, in fact, have an impact: It 
> halves the run time. However, that means even with {{inferTimestamp}}, the 
> run time is still 7 times slower than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26413) SPIP: RDD Arrow Support in Spark Core and PySpark

2019-01-23 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750740#comment-16750740
 ] 

Hyukjin Kwon commented on SPARK-26413:
--

I think SPARK-26412 can be resolved together if this one is resolved.

> SPIP: RDD Arrow Support in Spark Core and PySpark
> -
>
> Key: SPARK-26413
> URL: https://issues.apache.org/jira/browse/SPARK-26413
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 3.0.0
>Reporter: Richard Whitcomb
>Priority: Minor
>
> h2. Background and Motivation
> Arrow is becoming an standard interchange format for columnar Structured 
> Data.  This is already true in Spark with the use of arrow in the pandas udf 
> functions in the dataframe API.
> However the current implementation of arrow in spark is limited to two use 
> cases.
>  * Pandas UDF that allows for operations on one or more columns in the 
> DataFrame API.
>  * Collect as Pandas which pulls back the entire dataset to the driver in a 
> Pandas Dataframe.
> What is still hard however is making use of all of the columns in a Dataframe 
> while staying distributed across the workers.  The only way to do this 
> currently is to drop down into RDDs and collect the rows into a dataframe. 
> However pickling is very slow and the collecting is expensive.
> The proposal is to extend spark in a way that allows users to operate on an 
> Arrow Table fully while still making use of Spark's underlying technology.  
> Some examples of possibilities with this new API. 
>  * Pass the Arrow Table with Zero Copy to PyTorch for predictions.
>  * Pass to Nvidia Rapids for an algorithm to be run on the GPU.
>  * Distribute data across many GPUs making use of the new Barriers API.
> h2. Targets users and personas
> ML, Data Scientists, and future library authors..
> h2. Goals
>  * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
>  * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark 
> Dataframe
>  * Open the possibilities to tighter integration between Arrow/Pandas/Spark 
> especially at a library level.
> h2. Non-Goals
>  * Not creating a new API but instead using existing APIs.
> h2. Proposed API changes
> h3. Data Objects
> case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
> h3. Dataset.scala
> {code:java}
> // Converts a Dataset to an RDD of Arrow Tables
> // Each RDD row is an Interable of Arrow Batches.
> def arrowRDD: RDD[ArrowTable]
>  
> // Utility Function to convert to RDD Arrow Table for PySpark
> private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]]
> {code}
> h3. RDD.scala
> {code:java}
>  // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema
>  def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe
>   
>  // Converts RDD[ArrowTable] to an RDD of Rows
>  def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code}
> h3. Serializers.py
> {code:java}
> # Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table.
> class ArrowSerializer(FramedSerializer)
> {code}
> h3. RDD.py
> {code}
> # New RDD Class that has an RDD[ArrowTable] behind it and uses the new 
> ArrowSerializer instead of the normal Pickle Serializer
> class ArrowRDD(RDD){code}
>  
> h3. Dataframe.py
> {code}
> // New Function that converts a pyspark dataframe into an ArrowRDD
> def arrow(self):
> {code}
>  
> h2. Example API Usage
> h3. Pyspark
> {code}
> # Select a Single Column Using Pandas
> def map_table(arrow_table):
>   import pyarrow as pa
>   pdf = arrow_table.to_pandas()
>   pdf = pdf[['email']]
>   return pa.Table.from_pandas(pdf)
> # Convert to Arrow RDD, map over tables, convert back to dataframe
> df.arrow.map(map_table).dataframe 
> {code}
> h3. Scala
>  
> {code:java}
> // Find N Centroids using Cuda Rapids kMeans
> def runCuKmeans(table: ArrowTable, clusters: Int): ArrowTable
>  
> // Convert Dataset[Row] to RDD[ArrowTable] and back to Dataset[Row]
> df.arrowRDD.map(table => runCuKmeans(table, N)).arrowToDataframe.show(10)
> {code}
>  
> h2. Implementation Details
> As mentioned in the first section, the goal is to make it easier for Spark 
> users to interact with Arrow tools and libraries.  This however does come 
> with some considerations from a Spark perspective.
>  Arrow is column based instead of Row based.  In the above API proposal of 
> RDD[ArrowTable] each RDD row will in fact be a block of data.  Another 
> proposal in this regard is to introduce a new parameter to Spark called 
> arrow.sql.execution.arrow.maxRecordsPerTable.  The goal of this parameter is 
> to decide how many records are included in a single Arrow Table.  If set to 
> -1 the entire partition will be included in the table else to that number. 
> Within that number the normal blocking 

[jira] [Updated] (SPARK-26712) Disk broken causing YarnShuffleSerivce not available

2019-01-23 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26712:
-
Description: 
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a single directory, which may be 
unavailable if disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start even if there are executors on 
the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory when the disk of current directory is broken.

  was:
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a single directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start even if there are executors on 
the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory when the disk of current directory is broken.


> Disk broken causing YarnShuffleSerivce not available
> 
>
> Key: SPARK-26712
> URL: https://issues.apache.org/jira/browse/SPARK-26712
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
> enabled, however, the recovery file is under a single directory, which may be 
> unavailable if disk broken. So if a NM restart happen(may be caused by kill 
> or some reason), the shuffle service can not start even if there are 
> executors on the node.
> This may finally cause job failures(if node or executors on it not 
> blacklisted), or at least, it will cause resource waste.(shuffle from this 
> node always failed.)
> For long running spark applications, this problem may be more serious.
> So I think we should support multi directories(multi disk) for this recovery. 
> and change to good directory when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26712) Disk broken causing YarnShuffleSerivce not available

2019-01-23 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26712:
-
Description: 
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a single directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start even if there are executors on 
the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory when the disk of current directory is broken.

  was:
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a fixed directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start even if there are executors on 
the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory when the disk of current directory is broken.


> Disk broken causing YarnShuffleSerivce not available
> 
>
> Key: SPARK-26712
> URL: https://issues.apache.org/jira/browse/SPARK-26712
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
> enabled, however, the recovery file is under a single directory, which may be 
> unavailable if Disk broken. So if a NM restart happen(may be caused by kill 
> or some reason), the shuffle service can not start even if there are 
> executors on the node.
> This may finally cause job failures(if node or executors on it not 
> blacklisted), or at least, it will cause resource waste.(shuffle from this 
> node always failed.)
> For long running spark applications, this problem may be more serious.
> So I think we should support multi directories(multi disk) for this recovery. 
> and change to good directory when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26709) OptimizeMetadataOnlyQuery does not correctly handle the files with zero record

2019-01-23 Thread Takeshi Yamamuro (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750729#comment-16750729
 ] 

Takeshi Yamamuro commented on SPARK-26709:
--

I looked over the code though, not yet. plz do it ;)

> OptimizeMetadataOnlyQuery does not correctly handle the files with zero record
> --
>
> Key: SPARK-26709
> URL: https://issues.apache.org/jira/browse/SPARK-26709
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.3, 2.2.3, 2.3.2, 2.4.0
>Reporter: Xiao Li
>Assignee: Xiao Li
>Priority: Blocker
>  Labels: correctness
>
> {code:java}
> import org.apache.spark.sql.functions.lit
> withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
>   withTempPath { path =>
> val tabLocation = path.getAbsolutePath
> val partLocation = new Path(path.getAbsolutePath, "partCol1=3")
> val df = spark.emptyDataFrame.select(lit(1).as("col1"))
> df.write.parquet(partLocation.toString)
> val readDF = spark.read.parquet(tabLocation)
> checkAnswer(readDF.selectExpr("max(partCol1)"), Row(null))
> checkAnswer(readDF.selectExpr("max(col1)"), Row(null))
>   }
> }
> {code}
> OptimizeMetadataOnlyQuery has a correctness bug to handle the file with the 
> empty records for partitioned tables. The above test will fail in 2.4, which 
> can generate an empty file, but the underlying issue in the read path still 
> exists in 2.3, 2.2 and 2.1. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26712) Disk broken causing YarnShuffleSerivce not available

2019-01-23 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26712:
-
Description: 
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a fixed directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start even if there are executors on 
the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory and when the disk of current directory is broken.

  was:
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a fixed directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start.

What's more, if the the `ExecutorShuffleInfo` will lost, and causes the 
shuffleservice unavailble even if there are executors on the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory and when the disk of current directory is broken.


> Disk broken causing YarnShuffleSerivce not available
> 
>
> Key: SPARK-26712
> URL: https://issues.apache.org/jira/browse/SPARK-26712
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
> enabled, however, the recovery file is under a fixed directory, which may be 
> unavailable if Disk broken. So if a NM restart happen(may be caused by kill 
> or some reason), the shuffle service can not start even if there are 
> executors on the node.
> This may finally cause job failures(if node or executors on it not 
> blacklisted), or at least, it will cause resource waste.(shuffle from this 
> node always failed.)
> For long running spark applications, this problem may be more serious.
> So I think we should support multi directories(multi disk) for this recovery. 
> and change to good directory and when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26712) Disk broken causing YarnShuffleSerivce not available

2019-01-23 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26712:
-
Description: 
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a fixed directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start even if there are executors on 
the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory when the disk of current directory is broken.

  was:
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a fixed directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start even if there are executors on 
the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory and when the disk of current directory is broken.


> Disk broken causing YarnShuffleSerivce not available
> 
>
> Key: SPARK-26712
> URL: https://issues.apache.org/jira/browse/SPARK-26712
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
> enabled, however, the recovery file is under a fixed directory, which may be 
> unavailable if Disk broken. So if a NM restart happen(may be caused by kill 
> or some reason), the shuffle service can not start even if there are 
> executors on the node.
> This may finally cause job failures(if node or executors on it not 
> blacklisted), or at least, it will cause resource waste.(shuffle from this 
> node always failed.)
> For long running spark applications, this problem may be more serious.
> So I think we should support multi directories(multi disk) for this recovery. 
> and change to good directory when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26712) Disk broken causing YarnShuffleSerivce not available

2019-01-23 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26712:
-
Description: 
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a fixed directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start.

What's more, if the the `ExecutorShuffleInfo` will lost, and causes the 
shuffleservice unavailble even if there are executors on the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory and when the disk of current directory is broken.

  was:
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a fixed directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the `ExecutorShuffleInfo` will lost, and causes the 
shuffleservice unavailble even if there are executors on the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory and when the disk of current directory is broken.


> Disk broken causing YarnShuffleSerivce not available
> 
>
> Key: SPARK-26712
> URL: https://issues.apache.org/jira/browse/SPARK-26712
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
> enabled, however, the recovery file is under a fixed directory, which may be 
> unavailable if Disk broken. So if a NM restart happen(may be caused by kill 
> or some reason), the shuffle service can not start.
> What's more, if the the `ExecutorShuffleInfo` will lost, and causes the 
> shuffleservice unavailble even if there are executors on the node.
> This may finally cause job failures(if node or executors on it not 
> blacklisted), or at least, it will cause resource waste.(shuffle from this 
> node always failed.)
> For long running spark applications, this problem may be more serious.
> So I think we should support multi directories(multi disk) for this recovery. 
> and change to good directory and when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26682) Task attempt ID collision causes lost data

2019-01-23 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-26682.
-
   Resolution: Fixed
Fix Version/s: 2.4.1
   3.0.0

Issue resolved by pull request 23608
[https://github.com/apache/spark/pull/23608]

> Task attempt ID collision causes lost data
> --
>
> Key: SPARK-26682
> URL: https://issues.apache.org/jira/browse/SPARK-26682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.3, 2.3.2, 2.4.0
>Reporter: Ryan Blue
>Assignee: Ryan Blue
>Priority: Blocker
>  Labels: data-loss
> Fix For: 3.0.0, 2.4.1
>
>
> We recently tracked missing data to a collision in the fake Hadoop task 
> attempt ID created when using Hadoop OutputCommitters. This is similar to 
> SPARK-24589.
> A stage had one task fail to get one shard from a shuffle, causing a 
> FetchFailedException and Spark resubmitted the stage. Because only one task 
> was affected, the original stage attempt continued running tasks that had 
> been resubmitted. Another task ran two attempts concurrently on the same 
> executor, but had the same attempt number because they were from different 
> stage attempts. Because the attempt number was the same, the task used the 
> same temp locations. That caused one attempt to fail because a file path 
> already existed, and that attempt then removed the shared temp location and 
> deleted the other task's data. When the second attempt succeeded, it 
> committed partial data.
> The problem was that both attempts had the same partition and attempt 
> numbers, despite being run in different stages, and that was used to create a 
> Hadoop task attempt ID on which the temp location was based. The fix is to 
> use Spark's global task attempt ID, which is a counter, instead of attempt 
> number because attempt number is reused in stage attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26682) Task attempt ID collision causes lost data

2019-01-23 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-26682:
---

Assignee: Ryan Blue

> Task attempt ID collision causes lost data
> --
>
> Key: SPARK-26682
> URL: https://issues.apache.org/jira/browse/SPARK-26682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.3, 2.3.2, 2.4.0
>Reporter: Ryan Blue
>Assignee: Ryan Blue
>Priority: Blocker
>  Labels: data-loss
>
> We recently tracked missing data to a collision in the fake Hadoop task 
> attempt ID created when using Hadoop OutputCommitters. This is similar to 
> SPARK-24589.
> A stage had one task fail to get one shard from a shuffle, causing a 
> FetchFailedException and Spark resubmitted the stage. Because only one task 
> was affected, the original stage attempt continued running tasks that had 
> been resubmitted. Another task ran two attempts concurrently on the same 
> executor, but had the same attempt number because they were from different 
> stage attempts. Because the attempt number was the same, the task used the 
> same temp locations. That caused one attempt to fail because a file path 
> already existed, and that attempt then removed the shared temp location and 
> deleted the other task's data. When the second attempt succeeded, it 
> committed partial data.
> The problem was that both attempts had the same partition and attempt 
> numbers, despite being run in different stages, and that was used to create a 
> Hadoop task attempt ID on which the temp location was based. The fix is to 
> use Spark's global task attempt ID, which is a counter, instead of attempt 
> number because attempt number is reused in stage attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26709) OptimizeMetadataOnlyQuery does not correctly handle the files with zero record

2019-01-23 Thread Gengliang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750714#comment-16750714
 ] 

Gengliang Wang commented on SPARK-26709:


[~maropu] I can take it. Are you working on it?

> OptimizeMetadataOnlyQuery does not correctly handle the files with zero record
> --
>
> Key: SPARK-26709
> URL: https://issues.apache.org/jira/browse/SPARK-26709
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.3, 2.2.3, 2.3.2, 2.4.0
>Reporter: Xiao Li
>Assignee: Xiao Li
>Priority: Blocker
>  Labels: correctness
>
> {code:java}
> import org.apache.spark.sql.functions.lit
> withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
>   withTempPath { path =>
> val tabLocation = path.getAbsolutePath
> val partLocation = new Path(path.getAbsolutePath, "partCol1=3")
> val df = spark.emptyDataFrame.select(lit(1).as("col1"))
> df.write.parquet(partLocation.toString)
> val readDF = spark.read.parquet(tabLocation)
> checkAnswer(readDF.selectExpr("max(partCol1)"), Row(null))
> checkAnswer(readDF.selectExpr("max(col1)"), Row(null))
>   }
> }
> {code}
> OptimizeMetadataOnlyQuery has a correctness bug to handle the file with the 
> empty records for partitioned tables. The above test will fail in 2.4, which 
> can generate an empty file, but the underlying issue in the read path still 
> exists in 2.3, 2.2 and 2.1. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26711) JSON Schema inference takes 15 times longer

2019-01-23 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750713#comment-16750713
 ] 

Hyukjin Kwon commented on SPARK-26711:
--

So how was the time if {{inferTimestamp}} was enable/disabled? It would be odd 
even if there's regression with {{inferTimestamp}} disabled. It just compares 
one if-else.

> JSON Schema inference takes 15 times longer
> ---
>
> Key: SPARK-26711
> URL: https://issues.apache.org/jira/browse/SPARK-26711
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Bruce Robbins
>Priority: Major
>
> I noticed that the first benchmark/case of JSONBenchmark ("JSON schema 
> inferring", "No encoding") was taking an hour to run, when it used to run in 
> 4-5 minutes.
> The culprit seems to be this commit: 
> [https://github.com/apache/spark/commit/d72571e51d]
> A quick look using a profiler, and it seems to be spending 99% of its time 
> doing some kind of exception handling in JsonInferSchema.scala.
> You can reproduce in the spark-shell by recreating the data used by the 
> benchmark
> {noformat}
> scala> :paste
> val rowsNum = 100 * 1000 * 1000
> spark.sparkContext.range(0, rowsNum, 1)
> .map(_ => "a")
> .toDF("fieldA")
> .write
> .option("encoding", "UTF-8")
> .json("utf8.json")
> // Entering paste mode (ctrl-D to finish)
> // Exiting paste mode, now interpreting.
> rowsNum: Int = 1
> scala> 
> {noformat}
> Then you can run the test by hand starting spark-shell as so (emulating 
> SqlBasedBenchmark):
> {noformat}
>  bin/spark-shell --driver-memory 8g \
>   --conf "spark.sql.autoBroadcastJoinThreshold=1" \
>   --conf "spark.sql.shuffle.partitions=1" --master "local[1]"
> {noformat}
> On commit d72571e51d:
> {noformat}
> scala> val start = System.currentTimeMillis; spark.read.json("utf8.json");  
> System.currentTimeMillis-start
> start: Long = 1548297682225
> res0: Long = 815978 <== 13.6 minutes
> scala>
> {noformat}
> On the previous commit (86100df54b):
> {noformat}
> scala> val start = System.currentTimeMillis; spark.read.json("utf8.json");  
> System.currentTimeMillis-start
> start: Long = 1548298927151
> res0: Long = 50087 <= 50 seconds
> scala> 
> {noformat}
> I also tried {{spark.read.option("inferTimestamp", 
> false).json("utf8.json")}}, but that option didn't seem to make a difference 
> in run time. Edit: {{inferTimestamp}} does, in fact, have an impact: It 
> halves the run time. However, that means even with {{inferTimestamp}}, the 
> run time is still 7 times slower than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26711) JSON Schema inference takes 15 times longer

2019-01-23 Thread Bruce Robbins (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Robbins updated SPARK-26711:
--
Description: 
I noticed that the first benchmark/case of JSONBenchmark ("JSON schema 
inferring", "No encoding") was taking an hour to run, when it used to run in 
4-5 minutes.

The culprit seems to be this commit: 
[https://github.com/apache/spark/commit/d72571e51d]

A quick look using a profiler, and it seems to be spending 99% of its time 
doing some kind of exception handling in JsonInferSchema.scala.

You can reproduce in the spark-shell by recreating the data used by the 
benchmark
{noformat}
scala> :paste
val rowsNum = 100 * 1000 * 1000
spark.sparkContext.range(0, rowsNum, 1)
.map(_ => "a")
.toDF("fieldA")
.write
.option("encoding", "UTF-8")
.json("utf8.json")

// Entering paste mode (ctrl-D to finish)

// Exiting paste mode, now interpreting.
rowsNum: Int = 1
scala> 
{noformat}
Then you can run the test by hand starting spark-shell as so (emulating 
SqlBasedBenchmark):
{noformat}
 bin/spark-shell --driver-memory 8g \
  --conf "spark.sql.autoBroadcastJoinThreshold=1" \
  --conf "spark.sql.shuffle.partitions=1" --master "local[1]"
{noformat}
On commit d72571e51d:
{noformat}
scala> val start = System.currentTimeMillis; spark.read.json("utf8.json");  
System.currentTimeMillis-start
start: Long = 1548297682225
res0: Long = 815978 <== 13.6 minutes
scala>
{noformat}
On the previous commit (86100df54b):
{noformat}
scala> val start = System.currentTimeMillis; spark.read.json("utf8.json");  
System.currentTimeMillis-start
start: Long = 1548298927151
res0: Long = 50087 <= 50 seconds
scala> 
{noformat}
I also tried {{spark.read.option("inferTimestamp", false).json("utf8.json")}}, 
but that option didn't seem to make a difference in run time. Edit: 
{{inferTimestamp}} does, in fact, have an impact: It halves the run time. 
However, that means even with {{inferTimestamp}}, the run time is still 7 times 
slower than before.

  was:
I noticed that the first benchmark/case of JSONBenchmark ("JSON schema 
inferring", "No encoding") was taking an hour to run, when it used to run in 
4-5 minutes.

The culprit seems to be this commit: 
[https://github.com/apache/spark/commit/d72571e51d]

A quick look using a profiler, and it seems to be spending 99% of its time 
doing some kind of exception handling in JsonInferSchema.scala.

You can reproduce in the spark-shell by recreating the data used by the 
benchmark
{noformat}
scala> :paste
val rowsNum = 100 * 1000 * 1000
spark.sparkContext.range(0, rowsNum, 1)
.map(_ => "a")
.toDF("fieldA")
.write
.option("encoding", "UTF-8")
.json("utf8.json")

// Entering paste mode (ctrl-D to finish)

// Exiting paste mode, now interpreting.
rowsNum: Int = 1
scala> 
{noformat}
Then you can run the test by hand starting spark-shell as so (emulating 
SqlBasedBenchmark):
{noformat}
 bin/spark-shell --driver-memory 8g \
  --conf "spark.sql.autoBroadcastJoinThreshold=1" \
  --conf "spark.sql.shuffle.partitions=1" --master "local[1]"
{noformat}
On commit d72571e51d:
{noformat}
scala> val start = System.currentTimeMillis; spark.read.json("utf8.json");  
System.currentTimeMillis-start
start: Long = 1548297682225
res0: Long = 815978 <== 13.6 minutes
scala>
{noformat}
On the previous commit (86100df54b):
{noformat}
scala> val start = System.currentTimeMillis; spark.read.json("utf8.json");  
System.currentTimeMillis-start
start: Long = 1548298927151
res0: Long = 50087 <= 50 seconds
scala> 
{noformat}
I also tried {{spark.read.option("inferTimestamp", false).json("utf8.json")}}, 
but that option didn't seem to make a difference in run time. Maybe I am using 
it incorrectly.


> JSON Schema inference takes 15 times longer
> ---
>
> Key: SPARK-26711
> URL: https://issues.apache.org/jira/browse/SPARK-26711
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Bruce Robbins
>Priority: Major
>
> I noticed that the first benchmark/case of JSONBenchmark ("JSON schema 
> inferring", "No encoding") was taking an hour to run, when it used to run in 
> 4-5 minutes.
> The culprit seems to be this commit: 
> [https://github.com/apache/spark/commit/d72571e51d]
> A quick look using a profiler, and it seems to be spending 99% of its time 
> doing some kind of exception handling in JsonInferSchema.scala.
> You can reproduce in the spark-shell by recreating the data used by the 
> benchmark
> {noformat}
> scala> :paste
> val rowsNum = 100 * 1000 * 1000
> spark.sparkContext.range(0, rowsNum, 1)
> .map(_ => "a")
> .toDF("fieldA")
> .write
> .option("encoding", "UTF-8")
> .json("utf8.json")
> // Entering paste mode (ctrl-D to 

[jira] [Commented] (SPARK-26711) JSON Schema inference takes 15 times longer

2019-01-23 Thread Bruce Robbins (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750704#comment-16750704
 ] 

Bruce Robbins commented on SPARK-26711:
---

ping [~maxgekk] [~hyukjin.kwon]

> JSON Schema inference takes 15 times longer
> ---
>
> Key: SPARK-26711
> URL: https://issues.apache.org/jira/browse/SPARK-26711
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Bruce Robbins
>Priority: Major
>
> I noticed that the first benchmark/case of JSONBenchmark ("JSON schema 
> inferring", "No encoding") was taking an hour to run, when it used to run in 
> 4-5 minutes.
> The culprit seems to be this commit: 
> [https://github.com/apache/spark/commit/d72571e51d]
> A quick look using a profiler, and it seems to be spending 99% of its time 
> doing some kind of exception handling in JsonInferSchema.scala.
> You can reproduce in the spark-shell by recreating the data used by the 
> benchmark
> {noformat}
> scala> :paste
> val rowsNum = 100 * 1000 * 1000
> spark.sparkContext.range(0, rowsNum, 1)
> .map(_ => "a")
> .toDF("fieldA")
> .write
> .option("encoding", "UTF-8")
> .json("utf8.json")
> // Entering paste mode (ctrl-D to finish)
> // Exiting paste mode, now interpreting.
> rowsNum: Int = 1
> scala> 
> {noformat}
> Then you can run the test by hand starting spark-shell as so (emulating 
> SqlBasedBenchmark):
> {noformat}
>  bin/spark-shell --driver-memory 8g \
>   --conf "spark.sql.autoBroadcastJoinThreshold=1" \
>   --conf "spark.sql.shuffle.partitions=1" --master "local[1]"
> {noformat}
> On commit d72571e51d:
> {noformat}
> scala> val start = System.currentTimeMillis; spark.read.json("utf8.json");  
> System.currentTimeMillis-start
> start: Long = 1548297682225
> res0: Long = 815978 <== 13.6 minutes
> scala>
> {noformat}
> On the previous commit (86100df54b):
> {noformat}
> scala> val start = System.currentTimeMillis; spark.read.json("utf8.json");  
> System.currentTimeMillis-start
> start: Long = 1548298927151
> res0: Long = 50087 <= 50 seconds
> scala> 
> {noformat}
> I also tried {{spark.read.option("inferTimestamp", 
> false).json("utf8.json")}}, but that option didn't seem to make a difference 
> in run time. Edit: {{inferTimestamp}} does, in fact, have an impact: It 
> halves the run time. However, that means even with {{inferTimestamp}}, the 
> run time is still 7 times slower than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26689) Disk broken causing broadcast failure

2019-01-23 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26689:
-
Summary: Disk broken causing broadcast failure  (was: Bad disk causing 
broadcast failure)

> Disk broken causing broadcast failure
> -
>
> Key: SPARK-26689
> URL: https://issues.apache.org/jira/browse/SPARK-26689
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
> Environment: Spark on Yarn
> Mutliple Disk
>Reporter: liupengcheng
>Priority: Major
>
> We encoutered an application failure in our production cluster which caused 
> by the bad disk problems. It will incur application failure.
> {code:java}
> Job aborted due to stage failure: Task serialization failed: 
> java.io.IOException: Failed to create local dir in 
> /home/work/hdd5/yarn/c3prc-hadoop/nodemanager/usercache/h_user_profile/appcache/application_1463372393999_144979/blockmgr-1f96b724-3e16-4c09-8601-1a2e3b758185/3b.
> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:73)
> org.apache.spark.storage.DiskStore.contains(DiskStore.scala:173)
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:391)
> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:801)
> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:629)
> org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:987)
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99)
> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:85)
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
> org.apache.spark.SparkContext.broadcast(SparkContext.scala:1332)
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:863)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1090)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
> scala.Option.foreach(Option.scala:236)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1086)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1085)
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1085)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1528)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1493)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1482)
> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> We have multiple disk on our cluster nodes, however, it still fails. I think 
> it's because spark does not handle bad disk in `DiskBlockManager` currently. 
> Actually, we can handle bad disk in multiple disk environment to avoid 
> application failure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26712) Disk broken causing YarnShuffleSerivce not available

2019-01-23 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26712:
-
Issue Type: Bug  (was: Improvement)

> Disk broken causing YarnShuffleSerivce not available
> 
>
> Key: SPARK-26712
> URL: https://issues.apache.org/jira/browse/SPARK-26712
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
> enabled, however, the recovery file is under a fixed directory, which may be 
> unavailable if Disk broken. So if a NM restart happen(may be caused by kill 
> or some reason), the `ExecutorShuffleInfo` will lost, and causes the 
> shuffleservice unavailble even if there are executors on the node.
> This may finally cause job failures(if node or executors on it not 
> blacklisted), or at least, it will cause resource waste.(shuffle from this 
> node always failed.)
> For long running spark applications, this problem may be more serious.
> So I think we should support multi directories(multi disk) for this recovery. 
> and change to good directory and when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26712) Disk broken causing YarnShuffleSerivce not available

2019-01-23 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26712:
-
Summary: Disk broken causing YarnShuffleSerivce not available  (was: Disk 
broken caused NM recovery failure causing YarnShuffleSerivce not available)

> Disk broken causing YarnShuffleSerivce not available
> 
>
> Key: SPARK-26712
> URL: https://issues.apache.org/jira/browse/SPARK-26712
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
> enabled, however, the recovery file is under a fixed directory, which may be 
> unavailable if Disk broken. So if a NM restart happen(may be caused by kill 
> or some reason), the `ExecutorShuffleInfo` will lost, and causes the 
> shuffleservice unavailble even if there are executors on the node.
> This may finally cause job failures(if node or executors on it not 
> blacklisted), or at least, it will cause resource waste.(shuffle from this 
> node always failed.)
> For long running spark applications, this problem may be more serious.
> So I think we should support multi directories(multi disk) for this recovery. 
> and change to good directory and when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26679) Deconflict spark.executor.pyspark.memory and spark.python.worker.memory

2019-01-23 Thread Felix Cheung (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750696#comment-16750696
 ] 

Felix Cheung commented on SPARK-26679:
--

I find the fraction configs very confusing and there are much misinformation 
about it and wrong hardcoded config. anyway. ...

> Deconflict spark.executor.pyspark.memory and spark.python.worker.memory
> ---
>
> Key: SPARK-26679
> URL: https://issues.apache.org/jira/browse/SPARK-26679
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 2.4.0
>Reporter: Ryan Blue
>Priority: Major
>
> In 2.4.0, spark.executor.pyspark.memory was added to limit the total memory 
> space of a python worker. There is another RDD setting, 
> spark.python.worker.memory that controls when Spark decides to spill data to 
> disk. These are currently similar, but not related to one another.
> PySpark should probably use spark.executor.pyspark.memory to limit or default 
> the setting of spark.python.worker.memory because the latter property 
> controls spilling and should be lower than the total memory limit. Renaming 
> spark.python.worker.memory would also help clarity because it sounds like it 
> should control the limit, but is more like the JVM setting 
> spark.memory.fraction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26712) Disk broken caused NM recovery failure causing YarnShuffleSerivce not available

2019-01-23 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26712:


 Summary: Disk broken caused NM recovery failure causing 
YarnShuffleSerivce not available
 Key: SPARK-26712
 URL: https://issues.apache.org/jira/browse/SPARK-26712
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle
Affects Versions: 2.4.0, 2.1.0
Reporter: liupengcheng


Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a fixed directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the `ExecutorShuffleInfo` will lost, and causes the 
shuffleservice unavailble even if there are executors on the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory and when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26679) Deconflict spark.executor.pyspark.memory and spark.python.worker.memory

2019-01-23 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750694#comment-16750694
 ] 

Hyukjin Kwon commented on SPARK-26679:
--

Also, if it controls a ratio comparing to the whole available (or limited) 
memory space, the ratio here 
(https://github.com/apache/spark/pull/21977#issuecomment-456163212) should be 
fixed and configurable per {{spark.memory.fraction}} I guess, not the 
{{spark.python.worker.memory}}.

> Deconflict spark.executor.pyspark.memory and spark.python.worker.memory
> ---
>
> Key: SPARK-26679
> URL: https://issues.apache.org/jira/browse/SPARK-26679
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 2.4.0
>Reporter: Ryan Blue
>Priority: Major
>
> In 2.4.0, spark.executor.pyspark.memory was added to limit the total memory 
> space of a python worker. There is another RDD setting, 
> spark.python.worker.memory that controls when Spark decides to spill data to 
> disk. These are currently similar, but not related to one another.
> PySpark should probably use spark.executor.pyspark.memory to limit or default 
> the setting of spark.python.worker.memory because the latter property 
> controls spilling and should be lower than the total memory limit. Renaming 
> spark.python.worker.memory would also help clarity because it sounds like it 
> should control the limit, but is more like the JVM setting 
> spark.memory.fraction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26679) Deconflict spark.executor.pyspark.memory and spark.python.worker.memory

2019-01-23 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750689#comment-16750689
 ] 

Hyukjin Kwon commented on SPARK-26679:
--

What does the current {{spark.memory.fraction}} exactly controls? In case of 
PySpark, that might be different since all the memory itself is for execution 
not for caching, storage, extra calculation or anything. It might ser/de some 
big instance but that's also counted as a part of execution.

> Deconflict spark.executor.pyspark.memory and spark.python.worker.memory
> ---
>
> Key: SPARK-26679
> URL: https://issues.apache.org/jira/browse/SPARK-26679
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 2.4.0
>Reporter: Ryan Blue
>Priority: Major
>
> In 2.4.0, spark.executor.pyspark.memory was added to limit the total memory 
> space of a python worker. There is another RDD setting, 
> spark.python.worker.memory that controls when Spark decides to spill data to 
> disk. These are currently similar, but not related to one another.
> PySpark should probably use spark.executor.pyspark.memory to limit or default 
> the setting of spark.python.worker.memory because the latter property 
> controls spilling and should be lower than the total memory limit. Renaming 
> spark.python.worker.memory would also help clarity because it sounds like it 
> should control the limit, but is more like the JVM setting 
> spark.memory.fraction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26679) Deconflict spark.executor.pyspark.memory and spark.python.worker.memory

2019-01-23 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750594#comment-16750594
 ] 

Hyukjin Kwon edited comment on SPARK-26679 at 1/24/19 3:30 AM:
---

{quote}
There are two extreme cases: (1) an app which does a ton of stuff in python and 
uses a lot of python memory from user, but no usage of the sort machinery and 
(2) an app which uses the sort machinery within python, but makes very little 
use of allocating memory from user code. 
{quote}

Yes, but I was wondering if we have some Spark configurations for both cases 
in, for instance, somewhere core (correct me if I am mistaken). Current sort 
machinery in Python looks checking the given hard limit for a shared memory 
space (for instance, if somehow Python uses 300MB for other purpose, sort will 
use 200MB and spills if that's set to 500MB). So, it guess it won't be a matter 
if user uses how much memory they use. The configuration 
{{spark.python.worker.memory}} looks initially introduced by the same targets 
but for different purposes.




was (Author: hyukjin.kwon):
{quote}
There are two extreme cases: (1) an app which does a ton of stuff in python and 
uses a lot of python memory from user, but no usage of the sort machinery and 
(2) an app which uses the sort machinery within python, but makes very little 
use of allocating memory from user code. 
{quote}

Yes, but I was wondering if we have some Spark configurations for both cases 
in, for instance, somewhere core (correct me if I am mistaken). Current sort 
machinery in Python looks checking the given hard limit for a shared memory 
space (for instance, if somehow Python uses 300MB for other purpose, Spark will 
use 200MB and spills if that's set to 500MB). So, it guess it won't be a matter 
if user uses how much memory they use. The configuration 
{{spark.python.worker.memory}} looks initially introduced by the same targets 
but for different purposes.



> Deconflict spark.executor.pyspark.memory and spark.python.worker.memory
> ---
>
> Key: SPARK-26679
> URL: https://issues.apache.org/jira/browse/SPARK-26679
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 2.4.0
>Reporter: Ryan Blue
>Priority: Major
>
> In 2.4.0, spark.executor.pyspark.memory was added to limit the total memory 
> space of a python worker. There is another RDD setting, 
> spark.python.worker.memory that controls when Spark decides to spill data to 
> disk. These are currently similar, but not related to one another.
> PySpark should probably use spark.executor.pyspark.memory to limit or default 
> the setting of spark.python.worker.memory because the latter property 
> controls spilling and should be lower than the total memory limit. Renaming 
> spark.python.worker.memory would also help clarity because it sounds like it 
> should control the limit, but is more like the JVM setting 
> spark.memory.fraction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation

2019-01-23 Thread David Vogelbacher (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750674#comment-16750674
 ] 

David Vogelbacher commented on SPARK-24437:
---

[~DaveDeCaprio] I have not tested it yet but 
https://issues.apache.org/jira/browse/SPARK-25998 and its associated 
[PR|https://github.com/apache/spark/pull/22995] might help here.

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26710) ImageSchemaSuite has some errors when running it in local laptop

2019-01-23 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26710:


Assignee: Apache Spark

> ImageSchemaSuite has some errors when running it in local laptop
> 
>
> Key: SPARK-26710
> URL: https://issues.apache.org/jira/browse/SPARK-26710
> Project: Spark
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 2.4.0
>Reporter: xubo245
>Assignee: Apache Spark
>Priority: Major
>
> ImageSchemaSuite and org.apache.spark.ml.source.image.ImageFileFormatSuite 
> has some errors when running it in local laptop
> {code:java}
> execute, tree:
> Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#17L])
>+- *(1) Project
>   +- *(1) Scan ExistingRDD[image#10]
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#17L])
>+- *(1) Project
>   +- *(1) Scan ExistingRDD[image#10]
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:129)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:488)
>   at 
> org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:429)
>   at 
> org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:428)
>   at 
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:472)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:154)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:719)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
>   at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:2756)
>   at 
> org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:2755)
>   at 
> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3291)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:147)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3287)
>   at org.apache.spark.sql.Dataset.count(Dataset.scala:2755)
>   at 
> org.apache.spark.ml.image.ImageSchemaSuite.$anonfun$new$2(ImageSchemaSuite.scala:53)
>   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
>   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
>   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>   at org.scalatest.Transformer.apply(Transformer.scala:22)
>   at org.scalatest.Transformer.apply(Transformer.scala:20)
>   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
>   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:104)
>   at 
> org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
>   at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
>   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
>   at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
>   at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
>   at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
>   at 
> 

[jira] [Commented] (SPARK-26689) Bad disk causing broadcast failure

2019-01-23 Thread liupengcheng (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750669#comment-16750669
 ] 

liupengcheng commented on SPARK-26689:
--

[~tgraves] We use yarn as the resource manager, and we run spark applications 
on Yarn with spark version 2.1.0. The information is already provided in the 
environment field. Is there any more information you want me to provide? BTW, I 
don't think this exception is related to resource manager.

> Bad disk causing broadcast failure
> --
>
> Key: SPARK-26689
> URL: https://issues.apache.org/jira/browse/SPARK-26689
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
> Environment: Spark on Yarn
> Mutliple Disk
>Reporter: liupengcheng
>Priority: Major
>
> We encoutered an application failure in our production cluster which caused 
> by the bad disk problems. It will incur application failure.
> {code:java}
> Job aborted due to stage failure: Task serialization failed: 
> java.io.IOException: Failed to create local dir in 
> /home/work/hdd5/yarn/c3prc-hadoop/nodemanager/usercache/h_user_profile/appcache/application_1463372393999_144979/blockmgr-1f96b724-3e16-4c09-8601-1a2e3b758185/3b.
> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:73)
> org.apache.spark.storage.DiskStore.contains(DiskStore.scala:173)
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:391)
> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:801)
> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:629)
> org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:987)
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99)
> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:85)
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
> org.apache.spark.SparkContext.broadcast(SparkContext.scala:1332)
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:863)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1090)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
> scala.Option.foreach(Option.scala:236)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1086)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1085)
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1085)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1528)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1493)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1482)
> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> We have multiple disk on our cluster nodes, however, it still fails. I think 
> it's because spark does not handle bad disk in `DiskBlockManager` currently. 
> Actually, we can handle bad disk in multiple disk environment to avoid 
> application failure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown

2019-01-23 Thread Fengyu Cao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750667#comment-16750667
 ] 

Fengyu Cao commented on SPARK-26389:


 a force clean-up flag maybe help if not use hdfs

the size of temp checkpints out of hdfs is not acceptable for me:
 # nginx log group by uid (1h window, 5min slide window)
 # run about 4 hours on 2 executor hosts (default trigger)
 # more than 1gb on each host

seems hdfs state store clean logic not work well on non-hdfs file system(xfs)

 

thanks anyway, shoud I close this issue or change type/prioriy?

> temp checkpoint folder at executor should be deleted on graceful shutdown
> -
>
> Key: SPARK-26389
> URL: https://issues.apache.org/jira/browse/SPARK-26389
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Fengyu Cao
>Priority: Major
>
> {{spark-submit --master mesos:// -conf 
> spark.streaming.stopGracefullyOnShutdown=true  framework>}}
> CTRL-C, framework shutdown
> {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = 
> f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = 
> 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error 
> org.apache.spark.SparkException: Writing job aborted.}}
> {{/tmp/temporary- on executor not deleted due to 
> org.apache.spark.SparkException: Writing job aborted., and this temp 
> checkpoint can't used to recovery.}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26710) ImageSchemaSuite has some errors when running it in local laptop

2019-01-23 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26710:


Assignee: (was: Apache Spark)

> ImageSchemaSuite has some errors when running it in local laptop
> 
>
> Key: SPARK-26710
> URL: https://issues.apache.org/jira/browse/SPARK-26710
> Project: Spark
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 2.4.0
>Reporter: xubo245
>Priority: Major
>
> ImageSchemaSuite and org.apache.spark.ml.source.image.ImageFileFormatSuite 
> has some errors when running it in local laptop
> {code:java}
> execute, tree:
> Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#17L])
>+- *(1) Project
>   +- *(1) Scan ExistingRDD[image#10]
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#17L])
>+- *(1) Project
>   +- *(1) Scan ExistingRDD[image#10]
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:129)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:488)
>   at 
> org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:429)
>   at 
> org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:428)
>   at 
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:472)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:154)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:719)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
>   at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:2756)
>   at 
> org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:2755)
>   at 
> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3291)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:147)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3287)
>   at org.apache.spark.sql.Dataset.count(Dataset.scala:2755)
>   at 
> org.apache.spark.ml.image.ImageSchemaSuite.$anonfun$new$2(ImageSchemaSuite.scala:53)
>   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
>   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
>   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>   at org.scalatest.Transformer.apply(Transformer.scala:22)
>   at org.scalatest.Transformer.apply(Transformer.scala:20)
>   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
>   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:104)
>   at 
> org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
>   at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
>   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
>   at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
>   at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
>   at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
>   at 
> 

[jira] [Created] (SPARK-26711) JSON Schema inference takes 15 times longer

2019-01-23 Thread Bruce Robbins (JIRA)
Bruce Robbins created SPARK-26711:
-

 Summary: JSON Schema inference takes 15 times longer
 Key: SPARK-26711
 URL: https://issues.apache.org/jira/browse/SPARK-26711
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Bruce Robbins


I noticed that the first benchmark/case of JSONBenchmark ("JSON schema 
inferring", "No encoding") was taking an hour to run, when it used to run in 
4-5 minutes.

The culprit seems to be this commit: 
[https://github.com/apache/spark/commit/d72571e51d]

A quick look using a profiler, and it seems to be spending 99% of its time 
doing some kind of exception handling in JsonInferSchema.scala.

You can reproduce in the spark-shell by recreating the data used by the 
benchmark
{noformat}
scala> :paste
val rowsNum = 100 * 1000 * 1000
spark.sparkContext.range(0, rowsNum, 1)
.map(_ => "a")
.toDF("fieldA")
.write
.option("encoding", "UTF-8")
.json("utf8.json")

// Entering paste mode (ctrl-D to finish)

// Exiting paste mode, now interpreting.
rowsNum: Int = 1
scala> 
{noformat}
Then you can run the test by hand starting spark-shell as so (emulating 
SqlBasedBenchmark):
{noformat}
 bin/spark-shell --driver-memory 8g \
  --conf "spark.sql.autoBroadcastJoinThreshold=1" \
  --conf "spark.sql.shuffle.partitions=1" --master "local[1]"
{noformat}
On commit d72571e51d:
{noformat}
scala> val start = System.currentTimeMillis; spark.read.json("utf8.json");  
System.currentTimeMillis-start
start: Long = 1548297682225
res0: Long = 815978 <== 13.6 minutes
scala>
{noformat}
On the previous commit (86100df54b):
{noformat}
scala> val start = System.currentTimeMillis; spark.read.json("utf8.json");  
System.currentTimeMillis-start
start: Long = 1548298927151
res0: Long = 50087 <= 50 seconds
scala> 
{noformat}
I also tried {{spark.read.option("inferTimestamp", false).json("utf8.json")}}, 
but that option didn't seem to make a difference in run time. Maybe I am using 
it incorrectly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26710) ImageSchemaSuite has some errors when running it in local laptop

2019-01-23 Thread xubo245 (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xubo245 updated SPARK-26710:

Description: 
ImageSchemaSuite and org.apache.spark.ml.source.image.ImageFileFormatSuite has 
some errors when running it in local laptop


{code:java}

execute, tree:
Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#17L])
   +- *(1) Project
  +- *(1) Scan ExistingRDD[image#10]

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#17L])
   +- *(1) Project
  +- *(1) Scan ExistingRDD[image#10]

at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:129)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at 
org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:488)
at 
org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:429)
at 
org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:428)
at 
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:472)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:154)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:719)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:2756)
at 
org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:2755)
at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3291)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:147)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3287)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2755)
at 
org.apache.spark.ml.image.ImageSchemaSuite.$anonfun$new$2(ImageSchemaSuite.scala:53)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:104)
at 
org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
at 
org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
at 
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
at 

[jira] [Created] (SPARK-26710) ImageSchemaSuite has some errors when running it in local laptop

2019-01-23 Thread xubo245 (JIRA)
xubo245 created SPARK-26710:
---

 Summary: ImageSchemaSuite has some errors when running it in local 
laptop
 Key: SPARK-26710
 URL: https://issues.apache.org/jira/browse/SPARK-26710
 Project: Spark
  Issue Type: Bug
  Components: Tests
Affects Versions: 2.4.0
Reporter: xubo245


ImageSchemaSuite has some errors when running it in local laptop


{code:java}

execute, tree:
Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#17L])
   +- *(1) Project
  +- *(1) Scan ExistingRDD[image#10]

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#17L])
   +- *(1) Project
  +- *(1) Scan ExistingRDD[image#10]

at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:129)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at 
org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:488)
at 
org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:429)
at 
org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:428)
at 
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:472)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:154)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:719)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:2756)
at 
org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:2755)
at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3291)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:147)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3287)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2755)
at 
org.apache.spark.ml.image.ImageSchemaSuite.$anonfun$new$2(ImageSchemaSuite.scala:53)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:104)
at 
org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
at 
org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
at 
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at 

[jira] [Commented] (SPARK-26679) Deconflict spark.executor.pyspark.memory and spark.python.worker.memory

2019-01-23 Thread Imran Rashid (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750660#comment-16750660
 ] 

Imran Rashid commented on SPARK-26679:
--

the java side has the same problem.  Spark has no idea how much memory the user 
needs for their own code.  That's why there is a spark.memory.fraction option.  
I think the new option "spark.executor.pyspark.memory" is close to 
"spark.executor.memory" (which sets -Xmx for each executor) on the java side.  
Most users don't mess w/ spark.memory.fraction, but its there to cover those 
extreme cases.

> Deconflict spark.executor.pyspark.memory and spark.python.worker.memory
> ---
>
> Key: SPARK-26679
> URL: https://issues.apache.org/jira/browse/SPARK-26679
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 2.4.0
>Reporter: Ryan Blue
>Priority: Major
>
> In 2.4.0, spark.executor.pyspark.memory was added to limit the total memory 
> space of a python worker. There is another RDD setting, 
> spark.python.worker.memory that controls when Spark decides to spill data to 
> disk. These are currently similar, but not related to one another.
> PySpark should probably use spark.executor.pyspark.memory to limit or default 
> the setting of spark.python.worker.memory because the latter property 
> controls spilling and should be lower than the total memory limit. Renaming 
> spark.python.worker.memory would also help clarity because it sounds like it 
> should control the limit, but is more like the JVM setting 
> spark.memory.fraction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26617) CacheManager blocks during requery

2019-01-23 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-26617.
-
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 23539
[https://github.com/apache/spark/pull/23539]

> CacheManager blocks during requery
> --
>
> Key: SPARK-26617
> URL: https://issues.apache.org/jira/browse/SPARK-26617
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Dave DeCaprio
>Assignee: Dave DeCaprio
>Priority: Minor
>  Labels: sql
> Fix For: 3.0.0
>
>
> This is related to Spark-26548.  The CacheManager also holds locks in 
> uncaching and recaching plans that hold the lock while the query optimizer 
> runs.  If the optimizer takes a long time, then the whole system can block up 
> waiting for the optimization run.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26617) CacheManager blocks during requery

2019-01-23 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-26617:
---

Assignee: Dave DeCaprio

> CacheManager blocks during requery
> --
>
> Key: SPARK-26617
> URL: https://issues.apache.org/jira/browse/SPARK-26617
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Dave DeCaprio
>Assignee: Dave DeCaprio
>Priority: Minor
>  Labels: sql
>
> This is related to Spark-26548.  The CacheManager also holds locks in 
> uncaching and recaching plans that hold the lock while the query optimizer 
> runs.  If the optimizer takes a long time, then the whole system can block up 
> waiting for the optimization run.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-25713) Implement copy() for ColumnarArray

2019-01-23 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-25713.
-
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 23569
[https://github.com/apache/spark/pull/23569]

> Implement copy() for ColumnarArray
> --
>
> Key: SPARK-25713
> URL: https://issues.apache.org/jira/browse/SPARK-25713
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Liwen Sun
>Priority: Minor
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26413) SPIP: RDD Arrow Support in Spark Core and PySpark

2019-01-23 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16731500#comment-16731500
 ] 

Hyukjin Kwon edited comment on SPARK-26413 at 1/24/19 1:39 AM:
---

I agree the usage can be limited for now; however, looks thats already what's 
going to happen if we add {{RDD[ArrowTable]}} <-> {{DataFrame}}.
I thought we're going to reuse this code path.

I vaguely assume that the API suggested in the JIRA, and the codes below are 
basically similar:

{code}
def arrowTableToRows(table: Iterator[ArrowTable]): Iterator[Row] {
  val arrowTable = table.next()
  assert !table.hasNext
  // convert arrow table to rows
}

def rowsToArrowTable: Iterator[Row]): Iterator[ArrowTable] {
  // convert rows to arrow table
  Iterator.single(...)
}
{code}

so that you can use:

{code}
val arrowRDD = df.rdd.mapPartitions(rowsToArrowTable)
val df = arrowRDD.mapPartitions(arrowTableToRows).toDF
{code}

Like:

{code}
val df = spark.range(100).repartition(10)
val rdd = df.rdd.mapPartitions(iter => Iterator.single(iter.toArray))
val df = rdd.mapPartitions(iter => iter.next.iterator).toDF
df.show()
{code}

If the advantage of adding it as an RDD API is mainly to avoid extra RDD 
operations (correct me if I'm mistaken), I doubt if we should add them as an 
RDD APIs, if this can be resolved by adding, for instance, {{arrowTableToRows}} 
and {{rowsToArrowTable}}.



was (Author: hyukjin.kwon):
I agree the usage can be limited for now; however, looks thats already what's 
going to happen if we add {{RDD[ArrowTable]}} <-> {{DataFrame}}.
I thought we're going to reuse this code path.

I vaguely assume that the API suggested in the JIRA, and the codes below are 
basically similar:

{code}
def arrowTableToRows(table: Iterator[ArrowTable]): Iterator[Row] {
  val arrowTable = table.next()
  assert !table.hasNext
  // convert arrow table to rows
}

def rowsToArrowTable: Iterator[Row]): Iterator[ArrowTable] {
  // convert rows to arrow table
  Iterator.single(...)
}
{code}

so that you can use:

{code}
val arrowRDD = df.rdd.mapPartitions(rowsToArrowTable)
val df = arrowRDD.mapPartitions(rowsToArrowTable).toDF
{code}

Like:

{code}
val df = spark.range(100).repartition(10)
val rdd = df.rdd.mapPartitions(iter => Iterator.single(iter.toArray))
val df = rdd.mapPartitions(iter => iter.next.iterator).toDF
df.show()
{code}

If the advantage of adding it as an RDD API is mainly to avoid extra RDD 
operations (correct me if I'm mistaken), I doubt if we should add them as an 
RDD APIs, if this can be resolved by adding, for instance, {{arrowTableToRows}} 
and {{rowsToArrowTable}}.


> SPIP: RDD Arrow Support in Spark Core and PySpark
> -
>
> Key: SPARK-26413
> URL: https://issues.apache.org/jira/browse/SPARK-26413
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 3.0.0
>Reporter: Richard Whitcomb
>Priority: Minor
>
> h2. Background and Motivation
> Arrow is becoming an standard interchange format for columnar Structured 
> Data.  This is already true in Spark with the use of arrow in the pandas udf 
> functions in the dataframe API.
> However the current implementation of arrow in spark is limited to two use 
> cases.
>  * Pandas UDF that allows for operations on one or more columns in the 
> DataFrame API.
>  * Collect as Pandas which pulls back the entire dataset to the driver in a 
> Pandas Dataframe.
> What is still hard however is making use of all of the columns in a Dataframe 
> while staying distributed across the workers.  The only way to do this 
> currently is to drop down into RDDs and collect the rows into a dataframe. 
> However pickling is very slow and the collecting is expensive.
> The proposal is to extend spark in a way that allows users to operate on an 
> Arrow Table fully while still making use of Spark's underlying technology.  
> Some examples of possibilities with this new API. 
>  * Pass the Arrow Table with Zero Copy to PyTorch for predictions.
>  * Pass to Nvidia Rapids for an algorithm to be run on the GPU.
>  * Distribute data across many GPUs making use of the new Barriers API.
> h2. Targets users and personas
> ML, Data Scientists, and future library authors..
> h2. Goals
>  * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
>  * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark 
> Dataframe
>  * Open the possibilities to tighter integration between Arrow/Pandas/Spark 
> especially at a library level.
> h2. Non-Goals
>  * Not creating a new API but instead using existing APIs.
> h2. Proposed API changes
> h3. Data Objects
> case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
> h3. Dataset.scala
> {code:java}
> // Converts a Dataset to an RDD of Arrow Tables

[jira] [Commented] (SPARK-26709) OptimizeMetadataOnlyQuery does not correctly handle the files with zero record

2019-01-23 Thread Takeshi Yamamuro (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750595#comment-16750595
 ] 

Takeshi Yamamuro commented on SPARK-26709:
--

Anyone is already working on this?

> OptimizeMetadataOnlyQuery does not correctly handle the files with zero record
> --
>
> Key: SPARK-26709
> URL: https://issues.apache.org/jira/browse/SPARK-26709
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.3, 2.2.3, 2.3.2, 2.4.0
>Reporter: Xiao Li
>Assignee: Xiao Li
>Priority: Blocker
>  Labels: correctness
>
> {code:java}
> import org.apache.spark.sql.functions.lit
> withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
>   withTempPath { path =>
> val tabLocation = path.getAbsolutePath
> val partLocation = new Path(path.getAbsolutePath, "partCol1=3")
> val df = spark.emptyDataFrame.select(lit(1).as("col1"))
> df.write.parquet(partLocation.toString)
> val readDF = spark.read.parquet(tabLocation)
> checkAnswer(readDF.selectExpr("max(partCol1)"), Row(null))
> checkAnswer(readDF.selectExpr("max(col1)"), Row(null))
>   }
> }
> {code}
> OptimizeMetadataOnlyQuery has a correctness bug to handle the file with the 
> empty records for partitioned tables. The above test will fail in 2.4, which 
> can generate an empty file, but the underlying issue in the read path still 
> exists in 2.3, 2.2 and 2.1. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26679) Deconflict spark.executor.pyspark.memory and spark.python.worker.memory

2019-01-23 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750594#comment-16750594
 ] 

Hyukjin Kwon commented on SPARK-26679:
--

{quote}
There are two extreme cases: (1) an app which does a ton of stuff in python and 
uses a lot of python memory from user, but no usage of the sort machinery and 
(2) an app which uses the sort machinery within python, but makes very little 
use of allocating memory from user code. 
{quote}

Yes, but I was wondering if we have some Spark configurations for both cases 
in, for instance, somewhere core (correct me if I am mistaken). Current sort 
machinery in Python looks checking the given hard limit for a shared memory 
space (for instance, if somehow Python uses 300MB for other purpose, Spark will 
use 200MB and spills if that's set to 500MB). So, it guess it won't be a matter 
if user uses how much memory they use. The configuration 
{{spark.python.worker.memory}} looks initially introduced by the same targets 
but for different purposes.



> Deconflict spark.executor.pyspark.memory and spark.python.worker.memory
> ---
>
> Key: SPARK-26679
> URL: https://issues.apache.org/jira/browse/SPARK-26679
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 2.4.0
>Reporter: Ryan Blue
>Priority: Major
>
> In 2.4.0, spark.executor.pyspark.memory was added to limit the total memory 
> space of a python worker. There is another RDD setting, 
> spark.python.worker.memory that controls when Spark decides to spill data to 
> disk. These are currently similar, but not related to one another.
> PySpark should probably use spark.executor.pyspark.memory to limit or default 
> the setting of spark.python.worker.memory because the latter property 
> controls spilling and should be lower than the total memory limit. Renaming 
> spark.python.worker.memory would also help clarity because it sounds like it 
> should control the limit, but is more like the JVM setting 
> spark.memory.fraction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread DB Tsai (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai updated SPARK-26706:

Affects Version/s: (was: 3.0.0)

> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.3, 2.0.2, 2.1.3, 2.2.3, 2.3.2, 2.4.0
>Reporter: Anton Okolnychyi
>Assignee: Anton Okolnychyi
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.3.3, 2.4.1
>
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.
> {code}
> // executes silently even though Long is converted into Byte
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
>   .map(b => b - 1)
>   .show()
> +-+
> |value|
> +-+
> |  -12|
> |  -11|
> |  -10|
> |   -9|
> |   -8|
> |   -7|
> |   -6|
> |   -5|
> |   -4|
> |   -3|
> +-+
> // throws an AnalysisException: Cannot up cast `id` from bigint to smallint 
> as it may truncate
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
>   .map(s => s - 1)
>   .show()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread DB Tsai (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai updated SPARK-26706:

Affects Version/s: (was: 2.4.1)
   (was: 2.3.3)
   2.3.2
   2.4.0

> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.3, 2.0.2, 2.1.3, 2.2.3, 2.3.2, 2.4.0, 3.0.0
>Reporter: Anton Okolnychyi
>Assignee: Anton Okolnychyi
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.3.3, 2.4.1
>
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.
> {code}
> // executes silently even though Long is converted into Byte
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
>   .map(b => b - 1)
>   .show()
> +-+
> |value|
> +-+
> |  -12|
> |  -11|
> |  -10|
> |   -9|
> |   -8|
> |   -7|
> |   -6|
> |   -5|
> |   -4|
> |   -3|
> +-+
> // throws an AnalysisException: Cannot up cast `id` from bigint to smallint 
> as it may truncate
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
>   .map(s => s - 1)
>   .show()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread DB Tsai (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai updated SPARK-26706:

Fix Version/s: (was: 2.3.4)
   2.3.3

> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.3, 2.0.2, 2.1.3, 2.2.3, 2.3.3, 2.4.1, 3.0.0
>Reporter: Anton Okolnychyi
>Assignee: Anton Okolnychyi
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.3.3, 2.4.1
>
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.
> {code}
> // executes silently even though Long is converted into Byte
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
>   .map(b => b - 1)
>   .show()
> +-+
> |value|
> +-+
> |  -12|
> |  -11|
> |  -10|
> |   -9|
> |   -8|
> |   -7|
> |   -6|
> |   -5|
> |   -4|
> |   -3|
> +-+
> // throws an AnalysisException: Cannot up cast `id` from bigint to smallint 
> as it may truncate
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
>   .map(s => s - 1)
>   .show()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread DB Tsai (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai updated SPARK-26706:

Fix Version/s: 2.4.1
   2.3.4

> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.3, 2.0.2, 2.1.3, 2.2.3, 2.3.3, 2.4.1, 3.0.0
>Reporter: Anton Okolnychyi
>Assignee: Anton Okolnychyi
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.3.4, 2.4.1
>
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.
> {code}
> // executes silently even though Long is converted into Byte
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
>   .map(b => b - 1)
>   .show()
> +-+
> |value|
> +-+
> |  -12|
> |  -11|
> |  -10|
> |   -9|
> |   -8|
> |   -7|
> |   -6|
> |   -5|
> |   -4|
> |   -3|
> +-+
> // throws an AnalysisException: Cannot up cast `id` from bigint to smallint 
> as it may truncate
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
>   .map(s => s - 1)
>   .show()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26709) OptimizeMetadataOnlyQuery does not correctly handle the files with zero record

2019-01-23 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li reassigned SPARK-26709:
---

Assignee: Xiao Li

> OptimizeMetadataOnlyQuery does not correctly handle the files with zero record
> --
>
> Key: SPARK-26709
> URL: https://issues.apache.org/jira/browse/SPARK-26709
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.3, 2.2.3, 2.3.2, 2.4.0
>Reporter: Xiao Li
>Assignee: Xiao Li
>Priority: Blocker
>  Labels: correctness
>
> {code:java}
> import org.apache.spark.sql.functions.lit
> withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
>   withTempPath { path =>
> val tabLocation = path.getAbsolutePath
> val partLocation = new Path(path.getAbsolutePath, "partCol1=3")
> val df = spark.emptyDataFrame.select(lit(1).as("col1"))
> df.write.parquet(partLocation.toString)
> val readDF = spark.read.parquet(tabLocation)
> checkAnswer(readDF.selectExpr("max(partCol1)"), Row(null))
> checkAnswer(readDF.selectExpr("max(col1)"), Row(null))
>   }
> }
> {code}
> OptimizeMetadataOnlyQuery has a correctness bug to handle the file with the 
> empty records for partitioned tables. The above test will fail in 2.4, which 
> can generate an empty file, but the underlying issue in the read path still 
> exists in 2.3, 2.2 and 2.1. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26709) OptimizeMetadataOnlyQuery does not correctly handle the files with zero record

2019-01-23 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-26709:

Summary: OptimizeMetadataOnlyQuery does not correctly handle the files with 
zero record  (was: OptimizeMetadataOnlyQuery does not correctly handle the 
empty files)

> OptimizeMetadataOnlyQuery does not correctly handle the files with zero record
> --
>
> Key: SPARK-26709
> URL: https://issues.apache.org/jira/browse/SPARK-26709
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.3, 2.2.3, 2.3.2, 2.4.0
>Reporter: Xiao Li
>Priority: Blocker
>  Labels: correctness
>
> {code:java}
> import org.apache.spark.sql.functions.lit
> withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
>   withTempPath { path =>
> val tabLocation = path.getAbsolutePath
> val partLocation = new Path(path.getAbsolutePath, "partCol1=3")
> val df = spark.emptyDataFrame.select(lit(1).as("col1"))
> df.write.parquet(partLocation.toString)
> val readDF = spark.read.parquet(tabLocation)
> checkAnswer(readDF.selectExpr("max(partCol1)"), Row(null))
> checkAnswer(readDF.selectExpr("max(col1)"), Row(null))
>   }
> }
> {code}
> OptimizeMetadataOnlyQuery has a correctness bug to handle the file with the 
> empty records for partitioned tables. The above test will fail in 2.4, which 
> can generate an empty file, but the underlying issue in the read path still 
> exists in 2.3, 2.2 and 2.1. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26679) Deconflict spark.executor.pyspark.memory and spark.python.worker.memory

2019-01-23 Thread Imran Rashid (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750504#comment-16750504
 ] 

Imran Rashid commented on SPARK-26679:
--

I agree the old name "spark.python.worker.memory" is very confusing.  But I 
also don't see how you'd combine them.  There are two extreme cases: (1) an app 
which does a ton of stuff in python and uses a lot of python memory from user, 
but no usage of the sort machinery and (2) an app which uses the sort machinery 
within python, but makes very little use of allocating memory from user code.  
I don't think you have enough hooks into python's memory allocation to 
automatically spill if suddenly the user is trying to allocate more memory from 
python.

I agree it might make more sense to do something like spark.memory.fraction.  
I'm not sure if we should reuse that config for deciding what fraction of the 
pyspark memory goes to the pyspark shuffle machinery, or if there should be a 
new config spark.memory.pyspark.fraction.  (I can't think of a use case for 
keeping those separate)

> Deconflict spark.executor.pyspark.memory and spark.python.worker.memory
> ---
>
> Key: SPARK-26679
> URL: https://issues.apache.org/jira/browse/SPARK-26679
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 2.4.0
>Reporter: Ryan Blue
>Priority: Major
>
> In 2.4.0, spark.executor.pyspark.memory was added to limit the total memory 
> space of a python worker. There is another RDD setting, 
> spark.python.worker.memory that controls when Spark decides to spill data to 
> disk. These are currently similar, but not related to one another.
> PySpark should probably use spark.executor.pyspark.memory to limit or default 
> the setting of spark.python.worker.memory because the latter property 
> controls spilling and should be lower than the total memory limit. Renaming 
> spark.python.worker.memory would also help clarity because it sounds like it 
> should control the limit, but is more like the JVM setting 
> spark.memory.fraction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26709) OptimizeMetadataOnlyQuery does not correctly handle the empty files

2019-01-23 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-26709:

Description: 
{code:java}
import org.apache.spark.sql.functions.lit
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
  withTempPath { path =>
val tabLocation = path.getAbsolutePath
val partLocation = new Path(path.getAbsolutePath, "partCol1=3")
val df = spark.emptyDataFrame.select(lit(1).as("col1"))
df.write.parquet(partLocation.toString)
val readDF = spark.read.parquet(tabLocation)
checkAnswer(readDF.selectExpr("max(partCol1)"), Row(null))
checkAnswer(readDF.selectExpr("max(col1)"), Row(null))
  }
}
{code}

OptimizeMetadataOnlyQuery has a correctness bug to handle the file with the 
empty records for partitioned tables. The above test will fail in 2.4, which 
can generate an empty file, but the underlying issue in the read path still 
exists in 2.3, 2.2 and 2.1. 

  was:
{code:java}
import org.apache.spark.sql.functions.lit
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
  withTempPath { path =>
val tabLocation = path.getAbsolutePath
val partLocation = new Path(path.getAbsolutePath, "partCol1=3")
val df = spark.emptyDataFrame.select(lit(1).as("col1"))
df.write.parquet(partLocation.toString)
val readDF = spark.read.parquet(tabLocation)
checkAnswer(readDF.selectExpr("max(partCol1)"), Row(null))
checkAnswer(readDF.selectExpr("max(col1)"), Row(null))
  }
}
{code}

OptimizeMetadataOnlyQuery has a correctness bug to handle the file with the 
empty records for partitioned tables. The above test will fail in 2.4, which 
can write an empty file, but the underlying issue in the read path still exists 
in 2.3, 2.2 and 2.1. 


> OptimizeMetadataOnlyQuery does not correctly handle the empty files
> ---
>
> Key: SPARK-26709
> URL: https://issues.apache.org/jira/browse/SPARK-26709
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.3, 2.2.3, 2.3.2, 2.4.0
>Reporter: Xiao Li
>Priority: Blocker
>  Labels: correctness
>
> {code:java}
> import org.apache.spark.sql.functions.lit
> withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
>   withTempPath { path =>
> val tabLocation = path.getAbsolutePath
> val partLocation = new Path(path.getAbsolutePath, "partCol1=3")
> val df = spark.emptyDataFrame.select(lit(1).as("col1"))
> df.write.parquet(partLocation.toString)
> val readDF = spark.read.parquet(tabLocation)
> checkAnswer(readDF.selectExpr("max(partCol1)"), Row(null))
> checkAnswer(readDF.selectExpr("max(col1)"), Row(null))
>   }
> }
> {code}
> OptimizeMetadataOnlyQuery has a correctness bug to handle the file with the 
> empty records for partitioned tables. The above test will fail in 2.4, which 
> can generate an empty file, but the underlying issue in the read path still 
> exists in 2.3, 2.2 and 2.1. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26709) OptimizeMetadataOnlyQuery does not correctly handle the empty files

2019-01-23 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-26709:

Description: 
{code:java}
import org.apache.spark.sql.functions.lit
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
  withTempPath { path =>
val tabLocation = path.getAbsolutePath
val partLocation = new Path(path.getAbsolutePath, "partCol1=3")
val df = spark.emptyDataFrame.select(lit(1).as("col1"))
df.write.parquet(partLocation.toString)
val readDF = spark.read.parquet(tabLocation)
checkAnswer(readDF.selectExpr("max(partCol1)"), Row(null))
checkAnswer(readDF.selectExpr("max(col1)"), Row(null))
  }
}
{code}

OptimizeMetadataOnlyQuery has a correctness bug to handle the file with the 
empty records for partitioned tables. The above test will fail in 2.4, which 
can write an empty file, but the underlying issue in the read path still exists 
in 2.3, 2.2 and 2.1. 

  was:
{code:java}
import org.apache.spark.sql.functions.lit
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
  withTempPath { path =>
val tabLocation = path.getAbsolutePath
val partLocation = new Path(path.getAbsolutePath, "partCol1=3")
val df = spark.emptyDataFrame.select(lit(1).as("col1"))
df.write.parquet(partLocation.toString)
val readDF = spark.read.parquet(tabLocation)
checkAnswer(readDF.selectExpr("max(partCol1)"), Row(null))
checkAnswer(readDF.selectExpr("max(col1)"), Row(null))
  }
}
{code}

OptimizeMetadataOnlyQuery has a correctness bug to handle the file with the 
empty records for partitioned tables. 


> OptimizeMetadataOnlyQuery does not correctly handle the empty files
> ---
>
> Key: SPARK-26709
> URL: https://issues.apache.org/jira/browse/SPARK-26709
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.3, 2.2.3, 2.3.2, 2.4.0
>Reporter: Xiao Li
>Priority: Blocker
>  Labels: correctness
>
> {code:java}
> import org.apache.spark.sql.functions.lit
> withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
>   withTempPath { path =>
> val tabLocation = path.getAbsolutePath
> val partLocation = new Path(path.getAbsolutePath, "partCol1=3")
> val df = spark.emptyDataFrame.select(lit(1).as("col1"))
> df.write.parquet(partLocation.toString)
> val readDF = spark.read.parquet(tabLocation)
> checkAnswer(readDF.selectExpr("max(partCol1)"), Row(null))
> checkAnswer(readDF.selectExpr("max(col1)"), Row(null))
>   }
> }
> {code}
> OptimizeMetadataOnlyQuery has a correctness bug to handle the file with the 
> empty records for partitioned tables. The above test will fail in 2.4, which 
> can write an empty file, but the underlying issue in the read path still 
> exists in 2.3, 2.2 and 2.1. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26709) OptimizeMetadataOnlyQuery does not correctly handle the empty files

2019-01-23 Thread Xiao Li (JIRA)
Xiao Li created SPARK-26709:
---

 Summary: OptimizeMetadataOnlyQuery does not correctly handle the 
empty files
 Key: SPARK-26709
 URL: https://issues.apache.org/jira/browse/SPARK-26709
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0, 2.3.2, 2.2.3, 2.1.3
Reporter: Xiao Li


{code:java}
import org.apache.spark.sql.functions.lit
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
  withTempPath { path =>
val tabLocation = path.getAbsolutePath
val partLocation = new Path(path.getAbsolutePath, "partCol1=3")
val df = spark.emptyDataFrame.select(lit(1).as("col1"))
df.write.parquet(partLocation.toString)
val readDF = spark.read.parquet(tabLocation)
checkAnswer(readDF.selectExpr("max(partCol1)"), Row(null))
checkAnswer(readDF.selectExpr("max(col1)"), Row(null))
  }
}
{code}

OptimizeMetadataOnlyQuery has a correctness bug to handle the file with the 
empty records for partitioned tables. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-26706:
--
Affects Version/s: 2.0.2
   2.1.3
   2.2.3

> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.3, 2.2.3, 2.3.3, 2.4.1, 3.0.0
>Reporter: Anton Okolnychyi
>Assignee: Anton Okolnychyi
>Priority: Blocker
>  Labels: correctness
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.
> {code}
> // executes silently even though Long is converted into Byte
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
>   .map(b => b - 1)
>   .show()
> +-+
> |value|
> +-+
> |  -12|
> |  -11|
> |  -10|
> |   -9|
> |   -8|
> |   -7|
> |   -6|
> |   -5|
> |   -4|
> |   -3|
> +-+
> // throws an AnalysisException: Cannot up cast `id` from bigint to smallint 
> as it may truncate
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
>   .map(s => s - 1)
>   .show()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26688) Provide configuration of initially blacklisted YARN nodes

2019-01-23 Thread Sergey (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750498#comment-16750498
 ] 

Sergey commented on SPARK-26688:


Hi There!

I'm very glad that the community paid attention to my question. Let me try to 
explain usecase

There is 1K nodes cluster and jobs have performance degradation because of a 
single node. It's rather hard to convince Cluster Ops to decommission node 
because of "performance degradation". Imagine 10 dev teams chase single ops 
team for valid reason (node has problems) or because code has a bug or data is 
skewed or spots on the sun. We can't just decommission node because random dev 
complains. 

Simple solution:
 * rerun failed / delayed job and blacklist "problematic" node in advance.
 * Report about the problem if job works w/o anomalies. 
 * ops collect complains about node and start to decommission it when 
"complains threshold" is reached. It's a rather low probability that many 
loosely coupled teams with loosely coupled jobs complain about a single node. 

Results
 * Ops are not spammed with a random requests from devs
 * devs are not blocked because of the really bad node.
 * it's very cheap for everyone to "blacklist" node during job submission w/o 
doing anything to node. 

> Provide configuration of initially blacklisted YARN nodes
> -
>
> Key: SPARK-26688
> URL: https://issues.apache.org/jira/browse/SPARK-26688
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 3.0.0
>Reporter: Attila Zsolt Piros
>Priority: Major
>
> Introducing new config for initially blacklisted YARN nodes.
> This came up in the apache spark user mailing list: 
> [http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Yarn-is-it-possible-to-manually-blacklist-nodes-before-running-spark-job-td34395.html]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26682) Task attempt ID collision causes lost data

2019-01-23 Thread Shixiong Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750486#comment-16750486
 ] 

Shixiong Zhu commented on SPARK-26682:
--

IIUC, this issue will cause a file deletion (delete the temp file) and a file 
rename (move the temp file to the target file) happen at the same time. Could 
you clarify why this will cause a task committed partial data? I think the file 
rename should either move the whole file to the target file, or just fail, 
right?

> Task attempt ID collision causes lost data
> --
>
> Key: SPARK-26682
> URL: https://issues.apache.org/jira/browse/SPARK-26682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.3, 2.3.2, 2.4.0
>Reporter: Ryan Blue
>Priority: Blocker
>  Labels: data-loss
>
> We recently tracked missing data to a collision in the fake Hadoop task 
> attempt ID created when using Hadoop OutputCommitters. This is similar to 
> SPARK-24589.
> A stage had one task fail to get one shard from a shuffle, causing a 
> FetchFailedException and Spark resubmitted the stage. Because only one task 
> was affected, the original stage attempt continued running tasks that had 
> been resubmitted. Another task ran two attempts concurrently on the same 
> executor, but had the same attempt number because they were from different 
> stage attempts. Because the attempt number was the same, the task used the 
> same temp locations. That caused one attempt to fail because a file path 
> already existed, and that attempt then removed the shared temp location and 
> deleted the other task's data. When the second attempt succeeded, it 
> committed partial data.
> The problem was that both attempts had the same partition and attempt 
> numbers, despite being run in different stages, and that was used to create a 
> Hadoop task attempt ID on which the temp location was based. The fix is to 
> use Spark's global task attempt ID, which is a counter, instead of attempt 
> number because attempt number is reused in stage attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26688) Provide configuration of initially blacklisted YARN nodes

2019-01-23 Thread Mridul Muralidharan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750477#comment-16750477
 ] 

Mridul Muralidharan commented on SPARK-26688:
-

If this is a legitimate usecase, we should get yarn team to enhance node label 
support.

> Provide configuration of initially blacklisted YARN nodes
> -
>
> Key: SPARK-26688
> URL: https://issues.apache.org/jira/browse/SPARK-26688
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 3.0.0
>Reporter: Attila Zsolt Piros
>Priority: Major
>
> Introducing new config for initially blacklisted YARN nodes.
> This came up in the apache spark user mailing list: 
> [http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Yarn-is-it-possible-to-manually-blacklist-nodes-before-running-spark-job-td34395.html]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750466#comment-16750466
 ] 

Dongjoon Hyun commented on SPARK-26706:
---

I updated the affected versions, too.

> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.3, 2.0.2, 2.1.3, 2.2.3, 2.3.3, 2.4.1, 3.0.0
>Reporter: Anton Okolnychyi
>Assignee: Anton Okolnychyi
>Priority: Blocker
>  Labels: correctness
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.
> {code}
> // executes silently even though Long is converted into Byte
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
>   .map(b => b - 1)
>   .show()
> +-+
> |value|
> +-+
> |  -12|
> |  -11|
> |  -10|
> |   -9|
> |   -8|
> |   -7|
> |   -6|
> |   -5|
> |   -4|
> |   -3|
> +-+
> // throws an AnalysisException: Cannot up cast `id` from bigint to smallint 
> as it may truncate
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
>   .map(s => s - 1)
>   .show()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-26706:
--
Affects Version/s: 1.6.3

> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.3, 2.0.2, 2.1.3, 2.2.3, 2.3.3, 2.4.1, 3.0.0
>Reporter: Anton Okolnychyi
>Assignee: Anton Okolnychyi
>Priority: Blocker
>  Labels: correctness
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.
> {code}
> // executes silently even though Long is converted into Byte
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
>   .map(b => b - 1)
>   .show()
> +-+
> |value|
> +-+
> |  -12|
> |  -11|
> |  -10|
> |   -9|
> |   -8|
> |   -7|
> |   -6|
> |   -5|
> |   -4|
> |   -3|
> +-+
> // throws an AnalysisException: Cannot up cast `id` from bigint to smallint 
> as it may truncate
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
>   .map(s => s - 1)
>   .show()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread DB Tsai (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai updated SPARK-26706:

Priority: Blocker  (was: Critical)

> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.3, 2.4.1, 3.0.0
>Reporter: Anton Okolnychyi
>Assignee: Anton Okolnychyi
>Priority: Blocker
>  Labels: correctness
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.
> {code}
> // executes silently even though Long is converted into Byte
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
>   .map(b => b - 1)
>   .show()
> +-+
> |value|
> +-+
> |  -12|
> |  -11|
> |  -10|
> |   -9|
> |   -8|
> |   -7|
> |   -6|
> |   -5|
> |   -4|
> |   -3|
> +-+
> // throws an AnalysisException: Cannot up cast `id` from bigint to smallint 
> as it may truncate
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
>   .map(s => s - 1)
>   .show()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread DB Tsai (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai updated SPARK-26706:

Labels: correctness  (was: )

> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.3, 2.4.1, 3.0.0
>Reporter: Anton Okolnychyi
>Assignee: Anton Okolnychyi
>Priority: Critical
>  Labels: correctness
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.
> {code}
> // executes silently even though Long is converted into Byte
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
>   .map(b => b - 1)
>   .show()
> +-+
> |value|
> +-+
> |  -12|
> |  -11|
> |  -10|
> |   -9|
> |   -8|
> |   -7|
> |   -6|
> |   -5|
> |   -4|
> |   -3|
> +-+
> // throws an AnalysisException: Cannot up cast `id` from bigint to smallint 
> as it may truncate
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
>   .map(s => s - 1)
>   .show()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26708) Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan

2019-01-23 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-26708:

Description: When performing non-cascading cache invalidation, {{recache}} 
is called on the other cache entries which are dependent on the cache being 
invalidated. It leads to the the physical plans of those cache entries being 
re-compiled. For those cache entries, if the cache RDD has already been 
persisted, chances are there will be inconsistency between the data and the new 
plan. It can cause a correctness issue if the new plan's {{outputPartitioning}} 
or {{outputOrdering}} is different from the that of the actual data, and 
meanwhile the cache is used by another query that asks for specific 
{{outputPartitioning}} or {{outputOrdering}} which happens to match the new 
plan but not the actual data.

> Incorrect result caused by inconsistency between a SQL cache's cached RDD and 
> its physical plan
> ---
>
> Key: SPARK-26708
> URL: https://issues.apache.org/jira/browse/SPARK-26708
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Xiao Li
>Assignee: Maryann Xue
>Priority: Blocker
>  Labels: correctness
>
> When performing non-cascading cache invalidation, {{recache}} is called on 
> the other cache entries which are dependent on the cache being invalidated. 
> It leads to the the physical plans of those cache entries being re-compiled. 
> For those cache entries, if the cache RDD has already been persisted, chances 
> are there will be inconsistency between the data and the new plan. It can 
> cause a correctness issue if the new plan's {{outputPartitioning}} or 
> {{outputOrdering}} is different from the that of the actual data, and 
> meanwhile the cache is used by another query that asks for specific 
> {{outputPartitioning}} or {{outputOrdering}} which happens to match the new 
> plan but not the actual data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26708) Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan

2019-01-23 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-26708:

Labels: correctness  (was: )

> Incorrect result caused by inconsistency between a SQL cache's cached RDD and 
> its physical plan
> ---
>
> Key: SPARK-26708
> URL: https://issues.apache.org/jira/browse/SPARK-26708
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Xiao Li
>Assignee: Maryann Xue
>Priority: Blocker
>  Labels: correctness
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26708) Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan

2019-01-23 Thread Xiao Li (JIRA)
Xiao Li created SPARK-26708:
---

 Summary: Incorrect result caused by inconsistency between a SQL 
cache's cached RDD and its physical plan
 Key: SPARK-26708
 URL: https://issues.apache.org/jira/browse/SPARK-26708
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0
Reporter: Xiao Li
Assignee: Maryann Xue






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26706:


Assignee: Anton Okolnychyi  (was: Apache Spark)

> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.3, 2.4.1, 3.0.0
>Reporter: Anton Okolnychyi
>Assignee: Anton Okolnychyi
>Priority: Critical
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.
> {code}
> // executes silently even though Long is converted into Byte
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
>   .map(b => b - 1)
>   .show()
> +-+
> |value|
> +-+
> |  -12|
> |  -11|
> |  -10|
> |   -9|
> |   -8|
> |   -7|
> |   -6|
> |   -5|
> |   -4|
> |   -3|
> +-+
> // throws an AnalysisException: Cannot up cast `id` from bigint to smallint 
> as it may truncate
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
>   .map(s => s - 1)
>   .show()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26682) Task attempt ID collision causes lost data

2019-01-23 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin updated SPARK-26682:
---
Target Version/s: 2.3.3, 2.4.1
  Labels: data-loss  (was: )
Priority: Blocker  (was: Major)
  Issue Type: Bug  (was: Improvement)

> Task attempt ID collision causes lost data
> --
>
> Key: SPARK-26682
> URL: https://issues.apache.org/jira/browse/SPARK-26682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.3, 2.3.2, 2.4.0
>Reporter: Ryan Blue
>Priority: Blocker
>  Labels: data-loss
>
> We recently tracked missing data to a collision in the fake Hadoop task 
> attempt ID created when using Hadoop OutputCommitters. This is similar to 
> SPARK-24589.
> A stage had one task fail to get one shard from a shuffle, causing a 
> FetchFailedException and Spark resubmitted the stage. Because only one task 
> was affected, the original stage attempt continued running tasks that had 
> been resubmitted. Another task ran two attempts concurrently on the same 
> executor, but had the same attempt number because they were from different 
> stage attempts. Because the attempt number was the same, the task used the 
> same temp locations. That caused one attempt to fail because a file path 
> already existed, and that attempt then removed the shared temp location and 
> deleted the other task's data. When the second attempt succeeded, it 
> committed partial data.
> The problem was that both attempts had the same partition and attempt 
> numbers, despite being run in different stages, and that was used to create a 
> Hadoop task attempt ID on which the temp location was based. The fix is to 
> use Spark's global task attempt ID, which is a counter, instead of attempt 
> number because attempt number is reused in stage attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26706:


Assignee: Apache Spark  (was: Anton Okolnychyi)

> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.3, 2.4.1, 3.0.0
>Reporter: Anton Okolnychyi
>Assignee: Apache Spark
>Priority: Critical
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.
> {code}
> // executes silently even though Long is converted into Byte
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
>   .map(b => b - 1)
>   .show()
> +-+
> |value|
> +-+
> |  -12|
> |  -11|
> |  -10|
> |   -9|
> |   -8|
> |   -7|
> |   -6|
> |   -5|
> |   -4|
> |   -3|
> +-+
> // throws an AnalysisException: Cannot up cast `id` from bigint to smallint 
> as it may truncate
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
>   .map(s => s - 1)
>   .show()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24615) Accelerator-aware task scheduling for Spark

2019-01-23 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750375#comment-16750375
 ] 

Thomas Graves commented on SPARK-24615:
---

[~jerryshao]  just curious where this is at, are you still working on it?

> Accelerator-aware task scheduling for Spark
> ---
>
> Key: SPARK-24615
> URL: https://issues.apache.org/jira/browse/SPARK-24615
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Saisai Shao
>Priority: Major
>  Labels: Hydrogen, SPIP
>
> In the machine learning area, accelerator card (GPU, FPGA, TPU) is 
> predominant compared to CPUs. To make the current Spark architecture to work 
> with accelerator cards, Spark itself should understand the existence of 
> accelerators and know how to schedule task onto the executors where 
> accelerators are equipped.
> Current Spark’s scheduler schedules tasks based on the locality of the data 
> plus the available of CPUs. This will introduce some problems when scheduling 
> tasks with accelerators required.
>  # CPU cores are usually more than accelerators on one node, using CPU cores 
> to schedule accelerator required tasks will introduce the mismatch.
>  # In one cluster, we always assume that CPU is equipped in each node, but 
> this is not true of accelerator cards.
>  # The existence of heterogeneous tasks (accelerator required or not) 
> requires scheduler to schedule tasks with a smart way.
> So here propose to improve the current scheduler to support heterogeneous 
> tasks (accelerator requires or not). This can be part of the work of Project 
> hydrogen.
> Details is attached in google doc. It doesn't cover all the implementation 
> details, just highlight the parts should be changed.
>  
> CC [~yanboliang] [~merlintang]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread DB Tsai (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai updated SPARK-26706:

Priority: Critical  (was: Major)

> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.3, 2.4.1, 3.0.0
>Reporter: Anton Okolnychyi
>Assignee: Anton Okolnychyi
>Priority: Critical
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.
> {code}
> // executes silently even though Long is converted into Byte
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
>   .map(b => b - 1)
>   .show()
> +-+
> |value|
> +-+
> |  -12|
> |  -11|
> |  -10|
> |   -9|
> |   -8|
> |   -7|
> |   -6|
> |   -5|
> |   -4|
> |   -3|
> +-+
> // throws an AnalysisException: Cannot up cast `id` from bigint to smallint 
> as it may truncate
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
>   .map(s => s - 1)
>   .show()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread DB Tsai (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai reassigned SPARK-26706:
---

Assignee: Anton Okolnychyi

> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.3, 2.4.1, 3.0.0
>Reporter: Anton Okolnychyi
>Assignee: Anton Okolnychyi
>Priority: Major
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.
> {code}
> // executes silently even though Long is converted into Byte
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
>   .map(b => b - 1)
>   .show()
> +-+
> |value|
> +-+
> |  -12|
> |  -11|
> |  -10|
> |   -9|
> |   -8|
> |   -7|
> |   -6|
> |   -5|
> |   -4|
> |   -3|
> +-+
> // throws an AnalysisException: Cannot up cast `id` from bigint to smallint 
> as it may truncate
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
>   .map(s => s - 1)
>   .show()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26704) docker-image-tool.sh should copy custom Dockerfiles into the build context for inclusion in images

2019-01-23 Thread Rob Vesse (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rob Vesse resolved SPARK-26704.
---
Resolution: Not A Problem

> docker-image-tool.sh should copy custom Dockerfiles into the build context 
> for inclusion in images
> --
>
> Key: SPARK-26704
> URL: https://issues.apache.org/jira/browse/SPARK-26704
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Rob Vesse
>Priority: Major
>
> As surfaced in the discussion on the PR for SPARK-26687 
> (https://github.com/apache/spark/pull/23613) when using custom Dockerfiles 
> these are not copied into the build context.  Rather the build context 
> includes the default Dockerfiles from Spark regardless of what Dockerfiles 
> the end user actually used to build the images.
> The suggestion in the PR was that the script should copy in the custom 
> Dockerfiles over the stock  Dockerfiles.  This potentially aids in 
> reproducing the images later because someone with an image can get the exact 
> Dockerfile used to build that image.
> A related issue is that the script allows for and even in some cases 
> implicitly uses Docker build arguments as part of building the images.  In 
> the case where build arguments are used these should probably also be 
> captured in the image to aid reproducibility.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26704) docker-image-tool.sh should copy custom Dockerfiles into the build context for inclusion in images

2019-01-23 Thread Rob Vesse (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750366#comment-16750366
 ] 

Rob Vesse commented on SPARK-26704:
---

Yes sorry I'm conflating with the build context with the image contents.  So 
you're correct there isn't anything to do here.  Will close as Not a Problem

> docker-image-tool.sh should copy custom Dockerfiles into the build context 
> for inclusion in images
> --
>
> Key: SPARK-26704
> URL: https://issues.apache.org/jira/browse/SPARK-26704
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Rob Vesse
>Priority: Major
>
> As surfaced in the discussion on the PR for SPARK-26687 
> (https://github.com/apache/spark/pull/23613) when using custom Dockerfiles 
> these are not copied into the build context.  Rather the build context 
> includes the default Dockerfiles from Spark regardless of what Dockerfiles 
> the end user actually used to build the images.
> The suggestion in the PR was that the script should copy in the custom 
> Dockerfiles over the stock  Dockerfiles.  This potentially aids in 
> reproducing the images later because someone with an image can get the exact 
> Dockerfile used to build that image.
> A related issue is that the script allows for and even in some cases 
> implicitly uses Docker build arguments as part of building the images.  In 
> the case where build arguments are used these should probably also be 
> captured in the image to aid reproducibility.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26704) docker-image-tool.sh should copy custom Dockerfiles into the build context for inclusion in images

2019-01-23 Thread Marcelo Vanzin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750337#comment-16750337
 ] 

Marcelo Vanzin commented on SPARK-26704:


bq. they will be present and thus packaged into the image

They'll be present in the build context, but that doesn't mean they end up in 
the image. IIRC they do not.

> docker-image-tool.sh should copy custom Dockerfiles into the build context 
> for inclusion in images
> --
>
> Key: SPARK-26704
> URL: https://issues.apache.org/jira/browse/SPARK-26704
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Rob Vesse
>Priority: Major
>
> As surfaced in the discussion on the PR for SPARK-26687 
> (https://github.com/apache/spark/pull/23613) when using custom Dockerfiles 
> these are not copied into the build context.  Rather the build context 
> includes the default Dockerfiles from Spark regardless of what Dockerfiles 
> the end user actually used to build the images.
> The suggestion in the PR was that the script should copy in the custom 
> Dockerfiles over the stock  Dockerfiles.  This potentially aids in 
> reproducing the images later because someone with an image can get the exact 
> Dockerfile used to build that image.
> A related issue is that the script allows for and even in some cases 
> implicitly uses Docker build arguments as part of building the images.  In 
> the case where build arguments are used these should probably also be 
> captured in the image to aid reproducibility.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26704) docker-image-tool.sh should copy custom Dockerfiles into the build context for inclusion in images

2019-01-23 Thread Rob Vesse (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750323#comment-16750323
 ] 

Rob Vesse commented on SPARK-26704:
---

For me it's a question of build reproducibility (I've been following an 
interesting discussion around this on legal-discuss - 
https://lists.apache.org/thread.html/d578819f1afa6b8fb697ea72083e0fb05e43938a23d6e7bb804069b8@%3Clegal-discuss.apache.org%3E).
  If I crack open the image and start poking around and find a Dockerfile 
present do I have a reasonable expectation that the Dockerfile I find there is 
the one used to build the image?

If Yes, then we should ensure we include the correct Dockerfile's in the build 
context and thus the image.

If No, then we should probably not bother including the Dockerfile's at all.  
However since as you point out when building from a Spark release distribution 
they will be present and thus packaged into the image I would suspect we want 
to continue doing this even for developer builds.

> docker-image-tool.sh should copy custom Dockerfiles into the build context 
> for inclusion in images
> --
>
> Key: SPARK-26704
> URL: https://issues.apache.org/jira/browse/SPARK-26704
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Rob Vesse
>Priority: Major
>
> As surfaced in the discussion on the PR for SPARK-26687 
> (https://github.com/apache/spark/pull/23613) when using custom Dockerfiles 
> these are not copied into the build context.  Rather the build context 
> includes the default Dockerfiles from Spark regardless of what Dockerfiles 
> the end user actually used to build the images.
> The suggestion in the PR was that the script should copy in the custom 
> Dockerfiles over the stock  Dockerfiles.  This potentially aids in 
> reproducing the images later because someone with an image can get the exact 
> Dockerfile used to build that image.
> A related issue is that the script allows for and even in some cases 
> implicitly uses Docker build arguments as part of building the images.  In 
> the case where build arguments are used these should probably also be 
> captured in the image to aid reproducibility.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26379) Structured Streaming - Exception on adding column to Dataset

2019-01-23 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-26379:
--
Affects Version/s: 3.0.0

> Structured Streaming - Exception on adding column to Dataset
> 
>
> Key: SPARK-26379
> URL: https://issues.apache.org/jira/browse/SPARK-26379
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0, 3.0.0
>Reporter: Kailash Gupta
>Priority: Major
>
> While using withColumn to add a column to a structured streaming Dataset, I 
> am getting following exception: 
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
> dataType on unresolved object, tree: 'timestamp
> Following is sample code
> {code:java}
> final String path = "path_to_input_directory";
> final StructType schema = new StructType(new StructField[] { new 
> StructField("word", StringType, false, Metadata.empty()), new 
> StructField("count", DataTypes.IntegerType, false, Metadata.empty()) });
> SparkSession sparkSession = 
> SparkSession.builder().appName("StructuredStreamingIssue").master("local").getOrCreate();
> Dataset words = sparkSession.readStream().option("sep", 
> ",").schema(schema).csv(path);
> Dataset wordsWithTimestamp = words.withColumn("timestamp", 
> functions.current_timestamp());
> // wordsWithTimestamp.explain(true);
> StreamingQuery query = 
> wordsWithTimestamp.writeStream().outputMode("update").option("truncate", 
> "false").format("console").trigger(Trigger.ProcessingTime("2 
> seconds")).start();
> query.awaitTermination();{code}
> Following are the contents of the file present at _path_
> {code:java}
> a,2
> c,4
> d,2
> r,1
> t,9
> {code}
> This seems working with 2.2.0 release, but not with 2.3.0 and 2.4.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26707) Insert into table with single struct column fails

2019-01-23 Thread Bruce Robbins (JIRA)
Bruce Robbins created SPARK-26707:
-

 Summary: Insert into table with single struct column fails
 Key: SPARK-26707
 URL: https://issues.apache.org/jira/browse/SPARK-26707
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0, 2.3.2, 2.2.3, 3.0.0
Reporter: Bruce Robbins


This works:
{noformat}
scala> sql("select named_struct('d1', 123) c1, 12 
c2").write.format("parquet").saveAsTable("structtbl2")

scala> sql("show create table structtbl2").show(truncate=false)
+---+
|createtab_stmt |
+---+
|CREATE TABLE `structtbl2` (`c1` STRUCT<`d1`: INT>, `c2` INT)
USING parquet
|
+---+

scala> sql("insert into structtbl2 values (struct(789), 17)")
res2: org.apache.spark.sql.DataFrame = []

scala> sql("select * from structtbl2").show
+-+---+
|   c1| c2|
+-+---+
|[789]| 17|
|[123]| 12|
+-+---+
scala>
{noformat}
However, if the table's only column is the struct column, the insert does not 
work:
{noformat}
scala> sql("select named_struct('d1', 123) 
c1").write.format("parquet").saveAsTable("structtbl1")

scala> sql("show create table structtbl1").show(truncate=false)
+-+
|createtab_stmt   |
+-+
|CREATE TABLE `structtbl1` (`c1` STRUCT<`d1`: INT>)
USING parquet
|
+-+

scala> sql("insert into structtbl1 values (struct(789))")
org.apache.spark.sql.AnalysisException: cannot resolve '`col1`' due to data 
type mismatch: cannot cast int to struct;;
'InsertIntoHadoopFsRelationCommand 
file:/Users/brobbins/github/spark_upstream/spark-warehouse/structtbl1, false, 
Parquet, Map(path -> 
file:/Users/brobbins/github/spark_upstream/spark-warehouse/structtbl1), Append, 
CatalogTable(
...etc...
{noformat}
I can work around it by using a named_struct as the value:
{noformat}
scala> sql("insert into structtbl1 values (named_struct('d1',789))")
res7: org.apache.spark.sql.DataFrame = []

scala> sql("select * from structtbl1").show
+-+
|   c1|
+-+
|[789]|
|[123]|
+-+

scala>
{noformat}
My guess is that I just don't understand how structs work. But maybe this is a 
bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26379) Structured Streaming - Exception on adding column to Dataset

2019-01-23 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750304#comment-16750304
 ] 

Dongjoon Hyun commented on SPARK-26379:
---

Thank you, [~kailashgupta1012] and [~kabhwan].

> Structured Streaming - Exception on adding column to Dataset
> 
>
> Key: SPARK-26379
> URL: https://issues.apache.org/jira/browse/SPARK-26379
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0, 3.0.0
>Reporter: Kailash Gupta
>Priority: Major
>
> While using withColumn to add a column to a structured streaming Dataset, I 
> am getting following exception: 
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
> dataType on unresolved object, tree: 'timestamp
> Following is sample code
> {code:java}
> final String path = "path_to_input_directory";
> final StructType schema = new StructType(new StructField[] { new 
> StructField("word", StringType, false, Metadata.empty()), new 
> StructField("count", DataTypes.IntegerType, false, Metadata.empty()) });
> SparkSession sparkSession = 
> SparkSession.builder().appName("StructuredStreamingIssue").master("local").getOrCreate();
> Dataset words = sparkSession.readStream().option("sep", 
> ",").schema(schema).csv(path);
> Dataset wordsWithTimestamp = words.withColumn("timestamp", 
> functions.current_timestamp());
> // wordsWithTimestamp.explain(true);
> StreamingQuery query = 
> wordsWithTimestamp.writeStream().outputMode("update").option("truncate", 
> "false").format("console").trigger(Trigger.ProcessingTime("2 
> seconds")).start();
> query.awaitTermination();{code}
> Following are the contents of the file present at _path_
> {code:java}
> a,2
> c,4
> d,2
> r,1
> t,9
> {code}
> This seems working with 2.2.0 release, but not with 2.3.0 and 2.4.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26379) Structured Streaming - Exception on adding column to Dataset

2019-01-23 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-26379:
--
Affects Version/s: 2.3.1
   2.3.2
   2.4.0

> Structured Streaming - Exception on adding column to Dataset
> 
>
> Key: SPARK-26379
> URL: https://issues.apache.org/jira/browse/SPARK-26379
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0
>Reporter: Kailash Gupta
>Priority: Major
>
> While using withColumn to add a column to a structured streaming Dataset, I 
> am getting following exception: 
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
> dataType on unresolved object, tree: 'timestamp
> Following is sample code
> {code:java}
> final String path = "path_to_input_directory";
> final StructType schema = new StructType(new StructField[] { new 
> StructField("word", StringType, false, Metadata.empty()), new 
> StructField("count", DataTypes.IntegerType, false, Metadata.empty()) });
> SparkSession sparkSession = 
> SparkSession.builder().appName("StructuredStreamingIssue").master("local").getOrCreate();
> Dataset words = sparkSession.readStream().option("sep", 
> ",").schema(schema).csv(path);
> Dataset wordsWithTimestamp = words.withColumn("timestamp", 
> functions.current_timestamp());
> // wordsWithTimestamp.explain(true);
> StreamingQuery query = 
> wordsWithTimestamp.writeStream().outputMode("update").option("truncate", 
> "false").format("console").trigger(Trigger.ProcessingTime("2 
> seconds")).start();
> query.awaitTermination();{code}
> Following are the contents of the file present at _path_
> {code:java}
> a,2
> c,4
> d,2
> r,1
> t,9
> {code}
> This seems working with 2.2.0 release, but not with 2.3.0 and 2.4.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17914) Spark SQL casting to TimestampType with nanosecond results in incorrect timestamp

2019-01-23 Thread Chaitanya P Chandurkar (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-17914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750292#comment-16750292
 ] 

Chaitanya P Chandurkar edited comment on SPARK-17914 at 1/23/19 6:20 PM:
-

I'm still seeing this issue in Spark 2.4.0 when using from_json() function. In 
ISO Zulu format datetime, it is not interpreting the timezone accurately after 
certain number of digits. Every digit added after 3rd digit in the timestamp is 
adding up more seconds to the parsed datetime.  For example, This datetime: 
"2019-01-23T17:50:29.9991Z" when parsed using spark's build-in from_json() 
function results in "2019-01-23T17:50:38.991+" ( Note the number of seconds 
added )

 

If I'm not wrong from_json() internally uses the Jackson JSON library. I'm not 
sure if the bug is within that or within spark.


was (Author: cchandurkar):
I'm still seeing this issue in Spark 2.4.0 when using from_json() function. In 
ISO Zulu format datetime, it is not interpreting the timezone accurately after 
certain number of digits. Every digit added after 3rd digit in the timestamp is 
adding up more seconds to the parsed datetime.  For example, This datetime: 
"2019-01-23T17:50:29.9991Z" when parsed using spark's build-in from_json() 
function results in "2019-01-23T17:50:38.991+" ( Note the number of seconds 
added )

> Spark SQL casting to TimestampType with nanosecond results in incorrect 
> timestamp
> -
>
> Key: SPARK-17914
> URL: https://issues.apache.org/jira/browse/SPARK-17914
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Oksana Romankova
>Assignee: Anton Okolnychyi
>Priority: Major
> Fix For: 2.2.0, 2.3.0
>
>
> In some cases when timestamps contain nanoseconds they will be parsed 
> incorrectly. 
> Examples: 
> "2016-05-14T15:12:14.0034567Z" -> "2016-05-14 15:12:14.034567"
> "2016-05-14T15:12:14.000345678Z" -> "2016-05-14 15:12:14.345678"
> The issue seems to be happening in DateTimeUtils.stringToTimestamp(). It 
> assumes that only 6 digit fraction of a second will be passed.
> With this being the case I would suggest either discarding nanoseconds 
> automatically, or throw an exception prompting to pre-format timestamps to 
> microsecond precision first before casting to the Timestamp.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17914) Spark SQL casting to TimestampType with nanosecond results in incorrect timestamp

2019-01-23 Thread Chaitanya P Chandurkar (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-17914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750292#comment-16750292
 ] 

Chaitanya P Chandurkar commented on SPARK-17914:


I'm still seeing this issue in Spark 2.4.0 when using from_json() function. In 
ISO Zulu format datetime, it is not interpreting the timezone accurately after 
certain number of digits. Every digit added after 3rd digit in the timestamp is 
adding up more seconds to the parsed datetime.  For example, This datetime: 
"2019-01-23T17:50:29.9991Z" when parsed using spark's build-in from_json() 
function results in "2019-01-23T17:50:38.991+" ( Note the number of seconds 
added )

> Spark SQL casting to TimestampType with nanosecond results in incorrect 
> timestamp
> -
>
> Key: SPARK-17914
> URL: https://issues.apache.org/jira/browse/SPARK-17914
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Oksana Romankova
>Assignee: Anton Okolnychyi
>Priority: Major
> Fix For: 2.2.0, 2.3.0
>
>
> In some cases when timestamps contain nanoseconds they will be parsed 
> incorrectly. 
> Examples: 
> "2016-05-14T15:12:14.0034567Z" -> "2016-05-14 15:12:14.034567"
> "2016-05-14T15:12:14.000345678Z" -> "2016-05-14 15:12:14.345678"
> The issue seems to be happening in DateTimeUtils.stringToTimestamp(). It 
> assumes that only 6 digit fraction of a second will be passed.
> With this being the case I would suggest either discarding nanoseconds 
> automatically, or throw an exception prompting to pre-format timestamps to 
> microsecond precision first before casting to the Timestamp.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17914) Spark SQL casting to TimestampType with nanosecond results in incorrect timestamp

2019-01-23 Thread Chaitanya P Chandurkar (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-17914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750292#comment-16750292
 ] 

Chaitanya P Chandurkar edited comment on SPARK-17914 at 1/23/19 6:25 PM:
-

I'm still seeing this issue in Spark 2.4.0 when using from_json() function. In 
ISO Zulu format datetime, it is not interpreting the timezone accurately after 
certain number of digits. Every digit added after 3rd digit in the timestamp is 
adding up more seconds to the parsed datetime.  For example, This datetime: 
"2019-01-23T17:50:29.9991Z" when parsed using spark's build-in from_json() 
function results in "2019-01-23T17:50:38.991+" ( Note the number of seconds 
added )

 

If I'm not wrong from_json() internally uses the Jackson JSON library. I'm not 
sure if the bug is within that or within spark.

 
{code:java}
// Create Schema to Parse JSON
val sc = StructType(
  StructField(
   "date", TimestampType
  ):: Nil
){code}
{code:java}
// Sample JSON Parsing using schema created
Seq( """{"date": "2019-01-22T18:33:39.134232733Z"}""" )
.toDF( "data" )
.withColumn( "parsed", from_json( $"data", sc ) )
{code}
This results in date being "2019-01-24T07:50:51.733+" ( Note the difference 
of 2 days ) 


was (Author: cchandurkar):
I'm still seeing this issue in Spark 2.4.0 when using from_json() function. In 
ISO Zulu format datetime, it is not interpreting the timezone accurately after 
certain number of digits. Every digit added after 3rd digit in the timestamp is 
adding up more seconds to the parsed datetime.  For example, This datetime: 
"2019-01-23T17:50:29.9991Z" when parsed using spark's build-in from_json() 
function results in "2019-01-23T17:50:38.991+" ( Note the number of seconds 
added )

 

If I'm not wrong from_json() internally uses the Jackson JSON library. I'm not 
sure if the bug is within that or within spark.

> Spark SQL casting to TimestampType with nanosecond results in incorrect 
> timestamp
> -
>
> Key: SPARK-17914
> URL: https://issues.apache.org/jira/browse/SPARK-17914
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Oksana Romankova
>Assignee: Anton Okolnychyi
>Priority: Major
> Fix For: 2.2.0, 2.3.0
>
>
> In some cases when timestamps contain nanoseconds they will be parsed 
> incorrectly. 
> Examples: 
> "2016-05-14T15:12:14.0034567Z" -> "2016-05-14 15:12:14.034567"
> "2016-05-14T15:12:14.000345678Z" -> "2016-05-14 15:12:14.345678"
> The issue seems to be happening in DateTimeUtils.stringToTimestamp(). It 
> assumes that only 6 digit fraction of a second will be passed.
> With this being the case I would suggest either discarding nanoseconds 
> automatically, or throw an exception prompting to pre-format timestamps to 
> microsecond precision first before casting to the Timestamp.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26699) Dataset column output discrepancies

2019-01-23 Thread Praveena (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Praveena updated SPARK-26699:
-
Issue Type: Question  (was: Bug)

> Dataset column output discrepancies 
> 
>
> Key: SPARK-26699
> URL: https://issues.apache.org/jira/browse/SPARK-26699
> Project: Spark
>  Issue Type: Question
>  Components: Input/Output
>Affects Versions: 2.3.2
>Reporter: Praveena
>Priority: Major
>
> Hi,
>  
> When i run my job in Local mode (meaning as standalone in Eclipse) with same 
> parquet input files, the output is -
>  
> locations
>  
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  null
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  
> But when i run the same code base with same input parquet files in the YARN 
> cluster mode, my output is as below -
> 
>  locations
>  
>  [*WrappedArray*([tr...
>  [*WrappedArray*([tr...
>  [WrappedArray([tr...
>  null
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
> Its appending WrappedArray :(
> I am using Apache Spark 2.3.2 version and the EMR Version is 5.19.0. What 
> could be the reason for discrepancies in the output of certain Table columns ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26704) docker-image-tool.sh should copy custom Dockerfiles into the build context for inclusion in images

2019-01-23 Thread Marcelo Vanzin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750249#comment-16750249
 ] 

Marcelo Vanzin commented on SPARK-26704:


You mentioned in the discussion that there is no need to copy the docker files, 
so what is there to do here?

That custom build context is for developers, it's not for end users. When run 
from the Spark distro, the script should not be creating those contexts. I'd 
avoid trying to make that fancier than it needs to be.

> docker-image-tool.sh should copy custom Dockerfiles into the build context 
> for inclusion in images
> --
>
> Key: SPARK-26704
> URL: https://issues.apache.org/jira/browse/SPARK-26704
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Rob Vesse
>Priority: Major
>
> As surfaced in the discussion on the PR for SPARK-26687 
> (https://github.com/apache/spark/pull/23613) when using custom Dockerfiles 
> these are not copied into the build context.  Rather the build context 
> includes the default Dockerfiles from Spark regardless of what Dockerfiles 
> the end user actually used to build the images.
> The suggestion in the PR was that the script should copy in the custom 
> Dockerfiles over the stock  Dockerfiles.  This potentially aids in 
> reproducing the images later because someone with an image can get the exact 
> Dockerfile used to build that image.
> A related issue is that the script allows for and even in some cases 
> implicitly uses Docker build arguments as part of building the images.  In 
> the case where build arguments are used these should probably also be 
> captured in the image to aid reproducibility.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-20162) Reading data from MySQL - Cannot up cast from decimal(30,6) to decimal(38,18)

2019-01-23 Thread Franco Bonazza (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-20162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Franco Bonazza updated SPARK-20162:
---
Comment: was deleted

(was:  

I can reproduce this error without using Avro, as you can see the DataFrame is 
fine and created with DecimalType(38, 10) but when passed to Dataset with 
.as[Thing] it busts. This you can paste in a spark-shell, tested with spark 
2.3.0
{code:java}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{SparkSession, Row}
val schema = StructType(Seq(StructField("foo", DecimalType(38, 10
val spark: SparkSession = 
SparkSession.builder().master("local").config("spark.ui.enabled", 
"false").config("spark.sql.shuffle.partitions", 2).appName("spark 
test").getOrCreate()
val rdd = spark.sparkContext.makeRDD(Seq(Row(BigDecimal(10
case class Thing(foo: BigDecimal)
import spark.implicits._
spark.createDataFrame(rdd, schema).as[Thing]
{code}
 )

> Reading data from MySQL - Cannot up cast from decimal(30,6) to decimal(38,18)
> -
>
> Key: SPARK-20162
> URL: https://issues.apache.org/jira/browse/SPARK-20162
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Miroslav Spehar
>Priority: Major
>
> While reading data from MySQL, type conversion doesn't work for Decimal type 
> when the decimal in database is of lower precision/scale than the one spark 
> expects.
> Error:
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot up 
> cast `DECIMAL_AMOUNT` from decimal(30,6) to decimal(38,18) as it may truncate
> The type path of the target object is:
> - field (class: "org.apache.spark.sql.types.Decimal", name: "DECIMAL_AMOUNT")
> - root class: "com.misp.spark.Structure"
> You can either add an explicit cast to the input data or choose a higher 
> precision type of the field in the target object;
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:2119)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34$$anonfun$applyOrElse$14.applyOrElse(Analyzer.scala:2141)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34$$anonfun$applyOrElse$14.applyOrElse(Analyzer.scala:2136)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5$$anonfun$apply$11.apply(TreeNode.scala:360)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:358)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionDown$1(QueryPlan.scala:248)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:258)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:267)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:236)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34.applyOrElse(Analyzer.scala:2136)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34.applyOrElse(Analyzer.scala:2132)
>   at 
> 

[jira] [Commented] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown

2019-01-23 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750223#comment-16750223
 ] 

Gabor Somogyi commented on SPARK-26389:
---

Good to hear with HDFS it's working.

Prohibiting the user from using temp checkpoints when their frameworks are 
non-local is not a good idea because for example console sink uses that.
I think it's a valid use-case to print to console even in cluster mode with 
several nodes.

What we can do is to add a message which explains that temp checkpoint is used 
and its consequences.
Maybe a force clean-up flag for temp checkpoints can be introduced but have to 
think about it a bit more...


> temp checkpoint folder at executor should be deleted on graceful shutdown
> -
>
> Key: SPARK-26389
> URL: https://issues.apache.org/jira/browse/SPARK-26389
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Fengyu Cao
>Priority: Major
>
> {{spark-submit --master mesos:// -conf 
> spark.streaming.stopGracefullyOnShutdown=true  framework>}}
> CTRL-C, framework shutdown
> {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = 
> f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = 
> 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error 
> org.apache.spark.SparkException: Writing job aborted.}}
> {{/tmp/temporary- on executor not deleted due to 
> org.apache.spark.SparkException: Writing job aborted., and this temp 
> checkpoint can't used to recovery.}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24417) Build and Run Spark on JDK11

2019-01-23 Thread Michael Atef (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750208#comment-16750208
 ] 

Michael Atef edited comment on SPARK-24417 at 1/23/19 4:51 PM:
---

Hello,

I am facing problems with Pyspark when I moved to JDK11. Is this a known issue 
in py4j version in pyspark 2.4.x? If so, when will spark support JDK 11 
(specially pyspark) ?

 

attaching the stack-trace :

 
{code:java}
raceback (most recent call last):
  File 
"/Applications/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",
 line 63, in deco
  File 
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/py4j/protocol.py",
 line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o58.describe.
: java.lang.IllegalArgumentException: Unsupported class file major version 55
at org.apache.xbean.asm6.ClassReader.(ClassReader.java:166)
at org.apache.xbean.asm6.ClassReader.(ClassReader.java:148)
at org.apache.xbean.asm6.ClassReader.(ClassReader.java:136)
at org.apache.xbean.asm6.ClassReader.(ClassReader.java:237)
at 
org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49)
at 
org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
at 
org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
at 
scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at 
org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500)
at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175)
at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238)
at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631)
at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355)
at 
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307)
at 
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:306)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2100)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at 
org.apache.spark.sql.execution.stat.StatFunctions$.aggResult$lzycompute$1(StatFunctions.scala:273)
at 
org.apache.spark.sql.execution.stat.StatFunctions$.org$apache$spark$sql$execution$stat$StatFunctions$$aggResult$1(StatFunctions.scala:273)
at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$summary$2.apply$mcVI$sp(StatFunctions.scala:286)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at 
org.apache.spark.sql.execution.stat.StatFunctions$.summary(StatFunctions.scala:285)
at org.apache.spark.sql.Dataset.summary(Dataset.scala:2534)
at org.apache.spark.sql.Dataset.describe(Dataset.scala:2473)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 

[jira] [Commented] (SPARK-24417) Build and Run Spark on JDK11

2019-01-23 Thread Michael Atef (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750208#comment-16750208
 ] 

Michael Atef commented on SPARK-24417:
--

Hello,

I am facing problems with Pyspark when I moved to JDK11. Is this a known issue 
in py4j version in pyspark 2.4.x.

 

attaching the stack-trace :

 
{code:java}
raceback (most recent call last):
  File 
"/Applications/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",
 line 63, in deco
  File 
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/py4j/protocol.py",
 line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o58.describe.
: java.lang.IllegalArgumentException: Unsupported class file major version 55
at org.apache.xbean.asm6.ClassReader.(ClassReader.java:166)
at org.apache.xbean.asm6.ClassReader.(ClassReader.java:148)
at org.apache.xbean.asm6.ClassReader.(ClassReader.java:136)
at org.apache.xbean.asm6.ClassReader.(ClassReader.java:237)
at 
org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49)
at 
org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
at 
org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
at 
scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at 
org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500)
at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175)
at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238)
at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631)
at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355)
at 
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307)
at 
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:306)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2100)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at 
org.apache.spark.sql.execution.stat.StatFunctions$.aggResult$lzycompute$1(StatFunctions.scala:273)
at 
org.apache.spark.sql.execution.stat.StatFunctions$.org$apache$spark$sql$execution$stat$StatFunctions$$aggResult$1(StatFunctions.scala:273)
at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$summary$2.apply$mcVI$sp(StatFunctions.scala:286)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at 
org.apache.spark.sql.execution.stat.StatFunctions$.summary(StatFunctions.scala:285)
at org.apache.spark.sql.Dataset.summary(Dataset.scala:2534)
at org.apache.spark.sql.Dataset.describe(Dataset.scala:2473)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at 

[jira] [Resolved] (SPARK-25101) Creating leaderLatch with id for getting info about spark master nodes from zk

2019-01-23 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-25101.
---
Resolution: Won't Fix

> Creating leaderLatch with id for getting info about spark master nodes from zk
> --
>
> Key: SPARK-25101
> URL: https://issues.apache.org/jira/browse/SPARK-25101
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Yuliya
>Priority: Major
>
> Sometimes spark master nodes start faster than zookeeper => all my masters 
> are in STANDBY status. For solve this problem I check zk for getting info 
> about registering leaderLatch objects, but currently spark creates 
> leaderLatch without id (ZooKeeperLeaderElectionAgent:41) and I have no 
> possibilities getting info about registered leaderLatch for spark master node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26413) SPIP: RDD Arrow Support in Spark Core and PySpark

2019-01-23 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750174#comment-16750174
 ] 

Thomas Graves commented on SPARK-26413:
---

Just a note I think this overlaps with 
https://issues.apache.org/jira/browse/SPARK-24579

> SPIP: RDD Arrow Support in Spark Core and PySpark
> -
>
> Key: SPARK-26413
> URL: https://issues.apache.org/jira/browse/SPARK-26413
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 3.0.0
>Reporter: Richard Whitcomb
>Priority: Minor
>
> h2. Background and Motivation
> Arrow is becoming an standard interchange format for columnar Structured 
> Data.  This is already true in Spark with the use of arrow in the pandas udf 
> functions in the dataframe API.
> However the current implementation of arrow in spark is limited to two use 
> cases.
>  * Pandas UDF that allows for operations on one or more columns in the 
> DataFrame API.
>  * Collect as Pandas which pulls back the entire dataset to the driver in a 
> Pandas Dataframe.
> What is still hard however is making use of all of the columns in a Dataframe 
> while staying distributed across the workers.  The only way to do this 
> currently is to drop down into RDDs and collect the rows into a dataframe. 
> However pickling is very slow and the collecting is expensive.
> The proposal is to extend spark in a way that allows users to operate on an 
> Arrow Table fully while still making use of Spark's underlying technology.  
> Some examples of possibilities with this new API. 
>  * Pass the Arrow Table with Zero Copy to PyTorch for predictions.
>  * Pass to Nvidia Rapids for an algorithm to be run on the GPU.
>  * Distribute data across many GPUs making use of the new Barriers API.
> h2. Targets users and personas
> ML, Data Scientists, and future library authors..
> h2. Goals
>  * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
>  * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark 
> Dataframe
>  * Open the possibilities to tighter integration between Arrow/Pandas/Spark 
> especially at a library level.
> h2. Non-Goals
>  * Not creating a new API but instead using existing APIs.
> h2. Proposed API changes
> h3. Data Objects
> case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
> h3. Dataset.scala
> {code:java}
> // Converts a Dataset to an RDD of Arrow Tables
> // Each RDD row is an Interable of Arrow Batches.
> def arrowRDD: RDD[ArrowTable]
>  
> // Utility Function to convert to RDD Arrow Table for PySpark
> private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]]
> {code}
> h3. RDD.scala
> {code:java}
>  // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema
>  def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe
>   
>  // Converts RDD[ArrowTable] to an RDD of Rows
>  def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code}
> h3. Serializers.py
> {code:java}
> # Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table.
> class ArrowSerializer(FramedSerializer)
> {code}
> h3. RDD.py
> {code}
> # New RDD Class that has an RDD[ArrowTable] behind it and uses the new 
> ArrowSerializer instead of the normal Pickle Serializer
> class ArrowRDD(RDD){code}
>  
> h3. Dataframe.py
> {code}
> // New Function that converts a pyspark dataframe into an ArrowRDD
> def arrow(self):
> {code}
>  
> h2. Example API Usage
> h3. Pyspark
> {code}
> # Select a Single Column Using Pandas
> def map_table(arrow_table):
>   import pyarrow as pa
>   pdf = arrow_table.to_pandas()
>   pdf = pdf[['email']]
>   return pa.Table.from_pandas(pdf)
> # Convert to Arrow RDD, map over tables, convert back to dataframe
> df.arrow.map(map_table).dataframe 
> {code}
> h3. Scala
>  
> {code:java}
> // Find N Centroids using Cuda Rapids kMeans
> def runCuKmeans(table: ArrowTable, clusters: Int): ArrowTable
>  
> // Convert Dataset[Row] to RDD[ArrowTable] and back to Dataset[Row]
> df.arrowRDD.map(table => runCuKmeans(table, N)).arrowToDataframe.show(10)
> {code}
>  
> h2. Implementation Details
> As mentioned in the first section, the goal is to make it easier for Spark 
> users to interact with Arrow tools and libraries.  This however does come 
> with some considerations from a Spark perspective.
>  Arrow is column based instead of Row based.  In the above API proposal of 
> RDD[ArrowTable] each RDD row will in fact be a block of data.  Another 
> proposal in this regard is to introduce a new parameter to Spark called 
> arrow.sql.execution.arrow.maxRecordsPerTable.  The goal of this parameter is 
> to decide how many records are included in a single Arrow Table.  If set to 
> -1 the entire partition will be included in the table else to that number. 
> Within that number 

[jira] [Assigned] (SPARK-26649) Noop Streaming Sink using DSV2

2019-01-23 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26649:


Assignee: Apache Spark

> Noop Streaming Sink using DSV2
> --
>
> Key: SPARK-26649
> URL: https://issues.apache.org/jira/browse/SPARK-26649
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Xiao Li
>Assignee: Apache Spark
>Priority: Major
>
> Extend this noop data source to support a streaming sink that ignores all the 
> elements.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26649) Noop Streaming Sink using DSV2

2019-01-23 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26649:


Assignee: (was: Apache Spark)

> Noop Streaming Sink using DSV2
> --
>
> Key: SPARK-26649
> URL: https://issues.apache.org/jira/browse/SPARK-26649
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Xiao Li
>Priority: Major
>
> Extend this noop data source to support a streaming sink that ignores all the 
> elements.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26699) Dataset column output discrepancies

2019-01-23 Thread Praveena (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Praveena updated SPARK-26699:
-
Description: 
Hi,

 

When i run my job in Local mode (meaning as standalone in Eclipse) with same 
parquet input files, the output is -

 

locations
 
 [[[true, [[, phys...
 [[[true, [[, phys...
 [[[true, [[, phys...
 null
 [[[true, [[, phys...
 [[[true, [[, phys...
 [[[true, [[, phys...
 [[[true, [[, phys...
 [[[true, [[, phys...
 [[[true, [[, phys...

 

But when i run the same code base with same input parquet files in the YARN 
cluster mode, my output is as below -


 locations
 
 [*WrappedArray*([tr...
 [*WrappedArray*([tr...
 [WrappedArray([tr...
 null
 [WrappedArray([tr...
 [WrappedArray([tr...
 [WrappedArray([tr...
 [WrappedArray([tr...
 [WrappedArray([tr...
 [WrappedArray([tr...

Its appending WrappedArray :(

I am using Apache Spark 2.3.2 version and the EMR Version is 5.19.0. What could 
be the reason for discrepancies in the output of certain Table columns ?

  was:
Hi,

 

When i run my job in Local mode (meaning as standalone in Eclipse) with same 
parquet input files, the output is -

 

locations
 
 [[[true, [[, phys...
 [[[true, [[, phys...
 [[[true, [[, phys...
 null
 [[[true, [[, phys...
 [[[true, [[, phys...
 [[[true, [[, phys...
 [[[true, [[, phys...
 [[[true, [[, phys...
 [[[true, [[, phys...

 

But when i run the same code base with same input parquet files in the YARN 
cluster mode, my output is as below -


 locations
 
 [*WrappedArray*([tr...
 [*WrappedArray*([tr...
 [WrappedArray([tr...
 null
 [WrappedArray([tr...
 [WrappedArray([tr...
 [WrappedArray([tr...
 [WrappedArray([tr...
 [WrappedArray([tr...
 [WrappedArray([tr...

Its appending WrappedArray :(

I am using Apache Spark 2.3.2 version and the EMR Version while cluster is 
5.19.0. What could be the reason for discrepancies in the output of certain 
Table columns ?


> Dataset column output discrepancies 
> 
>
> Key: SPARK-26699
> URL: https://issues.apache.org/jira/browse/SPARK-26699
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.3.2
>Reporter: Praveena
>Priority: Major
>
> Hi,
>  
> When i run my job in Local mode (meaning as standalone in Eclipse) with same 
> parquet input files, the output is -
>  
> locations
>  
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  null
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  
> But when i run the same code base with same input parquet files in the YARN 
> cluster mode, my output is as below -
> 
>  locations
>  
>  [*WrappedArray*([tr...
>  [*WrappedArray*([tr...
>  [WrappedArray([tr...
>  null
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
> Its appending WrappedArray :(
> I am using Apache Spark 2.3.2 version and the EMR Version is 5.19.0. What 
> could be the reason for discrepancies in the output of certain Table columns ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26699) Dataset column output discrepancies

2019-01-23 Thread Praveena (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Praveena updated SPARK-26699:
-
Description: 
Hi,

 

When i run my job in Local mode (meaning as standalone in Eclipse) with same 
parquet input files, the output is -

 

locations
 
 [[[true, [[, phys...
 [[[true, [[, phys...
 [[[true, [[, phys...
 null
 [[[true, [[, phys...
 [[[true, [[, phys...
 [[[true, [[, phys...
 [[[true, [[, phys...
 [[[true, [[, phys...
 [[[true, [[, phys...

 

But when i run the same code base with same input parquet files in the YARN 
cluster mode, my output is as below -


 locations
 
 [*WrappedArray*([tr...
 [*WrappedArray*([tr...
 [WrappedArray([tr...
 null
 [WrappedArray([tr...
 [WrappedArray([tr...
 [WrappedArray([tr...
 [WrappedArray([tr...
 [WrappedArray([tr...
 [WrappedArray([tr...

Its appending WrappedArray :(

I am using Apache Spark 2.3.2 version and the EMR Version while cluster is 
5.19.0. What could be the reason for discrepancies in the output of certain 
Table columns ?

  was:
Hi,

 

When i run my job in Local mode with same parquet input files, the output is -

 

locations

[[[true, [[, phys...
[[[true, [[, phys...
[[[true, [[, phys...
 null
[[[true, [[, phys...
[[[true, [[, phys...
[[[true, [[, phys...
[[[true, [[, phys...
[[[true, [[, phys...
[[[true, [[, phys...

 

But when i run the same code base with same input parquet files in the YARN 
cluster mode, my output is as below -


 locations

[*WrappedArray*([tr...
[*WrappedArray*([tr...
[WrappedArray([tr...
 null
[WrappedArray([tr...
[WrappedArray([tr...
[WrappedArray([tr...
[WrappedArray([tr...
[WrappedArray([tr...
[WrappedArray([tr...

Its appending WrappedArray :(

I am using Apache Spark 2.3.2 version and the EMR Version while cluster is 
5.19.0. What could be the reason for discrepancies in the output of certain 
Table columns ?


> Dataset column output discrepancies 
> 
>
> Key: SPARK-26699
> URL: https://issues.apache.org/jira/browse/SPARK-26699
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.3.2
>Reporter: Praveena
>Priority: Major
>
> Hi,
>  
> When i run my job in Local mode (meaning as standalone in Eclipse) with same 
> parquet input files, the output is -
>  
> locations
>  
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  null
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  
> But when i run the same code base with same input parquet files in the YARN 
> cluster mode, my output is as below -
> 
>  locations
>  
>  [*WrappedArray*([tr...
>  [*WrappedArray*([tr...
>  [WrappedArray([tr...
>  null
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
> Its appending WrappedArray :(
> I am using Apache Spark 2.3.2 version and the EMR Version while cluster is 
> 5.19.0. What could be the reason for discrepancies in the output of certain 
> Table columns ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26689) Bad disk causing broadcast failure

2019-01-23 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750125#comment-16750125
 ] 

Thomas Graves commented on SPARK-26689:
---

Can you add more details about your setup?  Which resource manager were you 
running?  

> Bad disk causing broadcast failure
> --
>
> Key: SPARK-26689
> URL: https://issues.apache.org/jira/browse/SPARK-26689
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
> Environment: Spark on Yarn
> Mutliple Disk
>Reporter: liupengcheng
>Priority: Major
>
> We encoutered an application failure in our production cluster which caused 
> by the bad disk problems. It will incur application failure.
> {code:java}
> Job aborted due to stage failure: Task serialization failed: 
> java.io.IOException: Failed to create local dir in 
> /home/work/hdd5/yarn/c3prc-hadoop/nodemanager/usercache/h_user_profile/appcache/application_1463372393999_144979/blockmgr-1f96b724-3e16-4c09-8601-1a2e3b758185/3b.
> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:73)
> org.apache.spark.storage.DiskStore.contains(DiskStore.scala:173)
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:391)
> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:801)
> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:629)
> org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:987)
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99)
> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:85)
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
> org.apache.spark.SparkContext.broadcast(SparkContext.scala:1332)
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:863)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1090)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
> scala.Option.foreach(Option.scala:236)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1086)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1085)
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1085)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1528)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1493)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1482)
> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> We have multiple disk on our cluster nodes, however, it still fails. I think 
> it's because spark does not handle bad disk in `DiskBlockManager` currently. 
> Actually, we can handle bad disk in multiple disk environment to avoid 
> application failure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >