[jira] [Created] (SPARK-22605) OutputMetrics empty for DataFrame writes
Jason White created SPARK-22605: --- Summary: OutputMetrics empty for DataFrame writes Key: SPARK-22605 URL: https://issues.apache.org/jira/browse/SPARK-22605 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0 Reporter: Jason White Priority: Minor I am trying to use the SparkListener interface to hook up some custom monitoring for some of our critical jobs. Among the first metrics I would like is an output row count & size metric. I'm using PySpark and the Py4J interface to implement the listener. I am able to see the recordsRead and bytesRead metrics via the taskEnd.taskMetrics().inputMetrics().recordsRead() and .bytesRead() methods. taskEnd.taskMetrics().outputMetrics().recordsWritten() and .bytesWritten() are always 0. I see similar output if I use the stageCompleted event instead. To trigger execution, I am using df.write.parquet(path). If I use df.rdd.saveAsTextFile(path) instead, the counts and bytes are correct. Another clue that this bug is deeper in Spark SQL is that the Spark Application Master doesn't show the Output Size / Records column with df.write.parquet or df.write.text, but does with df.rdd.saveAsTextFile. Since the Spark Application Master also gets its output via the Listener interface, this would seem related. There is a related PR: https://issues.apache.org/jira/browse/SPARK-21882, but I believe this to be a distinct issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19950) nullable ignored when df.load() is executed for file-based data source
[ https://issues.apache.org/jira/browse/SPARK-19950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15934719#comment-15934719 ] Jason White commented on SPARK-19950: - Without something that allows us to read using the nullable as exists on-disk, we end doing: df = spark.read.parquet(path) return spark.createDataFrame(df.rdd, schema) Which is obviously not desirable. We would much rather rely on the schema as defined by the file format (Parquet in our case), or rely on a user-supplied schema. Preferably both. > nullable ignored when df.load() is executed for file-based data source > -- > > Key: SPARK-19950 > URL: https://issues.apache.org/jira/browse/SPARK-19950 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Kazuaki Ishizaki > > This problem is reported in [Databricks > forum|https://forums.databricks.com/questions/7123/nullable-seemingly-ignored-when-reading-parquet.html]. > When we execute the following code, a schema for "id" in {{dfRead}} has > {{nullable = true}}. It should be {{nullable = false}}. > {code:java} > val field = "id" > val df = spark.range(0, 5, 1, 1).toDF(field) > val fmt = "parquet" > val path = "/tmp/parquet" > val schema = StructType(Seq(StructField(field, LongType, false))) > df.write.format(fmt).mode("overwrite").save(path) > val dfRead = spark.read.format(fmt).schema(schema).load(path) > dfRead.printSchema > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19561) Pyspark Dataframes don't allow timestamps near epoch
Jason White created SPARK-19561: --- Summary: Pyspark Dataframes don't allow timestamps near epoch Key: SPARK-19561 URL: https://issues.apache.org/jira/browse/SPARK-19561 Project: Spark Issue Type: Bug Components: PySpark, SQL Affects Versions: 2.1.0, 2.0.1 Reporter: Jason White Pyspark does not allow timestamps at or near the epoch to be created in a DataFrame. Related issue: https://issues.apache.org/jira/browse/SPARK-19299 TimestampType.toInternal converts a datetime object to a number representing microseconds since the epoch. For all times more than 2148 seconds before or after 1970-01-01T00:00:00+, this number is greater than 2^31 and Py4J automatically serializes it as a long. However, for times within this range (~35 minutes before or after the epoch), Py4J serializes it as an int. When creating the object on the Scala side, ints are not recognized and the value goes to null. This leads to null values in non-nullable fields, and corrupted Parquet files. The solution is trivial - force TimestampType.toInternal to always return a long. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet
[ https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15832195#comment-15832195 ] Jason White edited comment on SPARK-19299 at 1/20/17 6:14 PM: -- These seem like two or three separate issues. - Python long doesn't fit into Scala long, resulting in Null even in a non-nullable field - Python datetime near the epoch doesn't fit ( ? ) into a Scala timestamp, resulting in Null even in a non-nullable field - DataFrame serialization to disk assumes, but doesn't verify, no nulls exist in non-nullable fields. This assumption is obviously fragile. (Also tested with JSON serialization - the column simply disappears in the JSON output for that row) was (Author: jason.white): These seem like two or three separate issues. - Python long doesn't fit into Scala long, resulting in Null even in a non-nullable field - Python datetime near the epoch doesn't fit ( ? ) into a Scala timestamp, resulting in Null even in a non-nullable field - DataFrame serialization to disk doesn't assumes, but doesn't verify, no nulls exist in non-nullable fields. This assumption is obviously fragile. (Also tested with JSON serialization - the column simply disappears in the JSON output for that row) > Nulls in non nullable columns causes data corruption in parquet > --- > > Key: SPARK-19299 > URL: https://issues.apache.org/jira/browse/SPARK-19299 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core >Affects Versions: 1.6.0, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Franklyn Dsouza >Priority: Critical > > The problem we're seeing is that if a null occurs in a no-nullable field and > is written down to parquet the resulting file gets corrupt and can not be > read back correctly. > One way that this can occur is when a long value in python is too big to fit > into a sql LongType it gets cast to null. > We're also seeing that the behaviour is different depending on whether or not > the vectorized reader is enabled. > Here's an example in PySpark > {code} > from datetime import datetime > from pyspark.sql import types > data = [ > (1, 6), > (2, 7), > (3, 2 ** 64), # value overflows sql LongType > (4, 8), > (5, 9) > ] > schema = types.StructType([ > types.StructField("index", types.LongType(), False), > types.StructField("long", types.LongType(), False), > ]) > df = sc.sql.createDataFrame(data, schema) > df.collect() > df.write.parquet("corrupt_parquet") > df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet") > df_parquet.collect() > {code} > with the vectorized reader enabled this causes > {code} > In [2]: df.collect() > Out[2]: > [Row(index=1, long=6), > Row(index=2, long=7), > Row(index=3, long=None), > Row(index=4, long=8), > Row(index=5, long=9)] > In [3]: df_parquet.collect() > Out[3]: > [Row(index=1, long=6), > Row(index=2, long=7), > Row(index=3, long=8), > Row(index=4, long=9), > Row(index=5, long=5)] > {code} > as you can see reading the data back from disk causes data to get shifted up > and between columns. > with the vectorized reader disabled we are completely unable to read the file. > {code} > Py4JJavaError: An error occurred while calling o143.collectToPython. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 > (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not > read value at 4 in block 0 in file > file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228) > at > org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) > at > org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitio
[jira] [Comment Edited] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet
[ https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15832195#comment-15832195 ] Jason White edited comment on SPARK-19299 at 1/20/17 6:09 PM: -- These seem like two or three separate issues. - Python long doesn't fit into Scala long, resulting in Null even in a non-nullable field - Python datetime near the epoch doesn't fit ( ? ) into a Scala timestamp, resulting in Null even in a non-nullable field - DataFrame serialization to disk doesn't assumes, but doesn't verify, no nulls exist in non-nullable fields. This assumption is obviously fragile. (Also tested with JSON serialization - the column simply disappears in the JSON output for that row) was (Author: jason.white): These seem like two or three separate issues. - Python long doesn't fit into Scala long, resulting in Null even in a non-nullable field - Python datetime near the epoch doesn't fit (?) into a Scala timestamp, resulting in Null even in a non-nullable field - DataFrame serialization to disk doesn't assumes, but doesn't verify, no nulls exist in non-nullable fields. This assumption is obviously fragile. (Also tested with JSON serialization - the column simply disappears in the JSON output for that row) > Nulls in non nullable columns causes data corruption in parquet > --- > > Key: SPARK-19299 > URL: https://issues.apache.org/jira/browse/SPARK-19299 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core >Affects Versions: 1.6.0, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Franklyn Dsouza > > The problem we're seeing is that if a null occurs in a no-nullable field and > is written down to parquet the resulting file gets corrupt and can not be > read back correctly. > One way that this can occur is when a long value in python is too big to fit > into a sql LongType it gets cast to null. > We're also seeing that the behaviour is different depending on whether or not > the vectorized reader is enabled. > Here's an example in PySpark > {code} > from datetime import datetime > from pyspark.sql import types > data = [ > (1, 6), > (2, 7), > (3, 2 ** 64), # value overflows sql LongType > (4, 8), > (5, 9) > ] > schema = types.StructType([ > types.StructField("index", types.LongType(), False), > types.StructField("long", types.LongType(), False), > ]) > df = sc.sql.createDataFrame(data, schema) > df.collect() > df.write.parquet("corrupt_parquet") > df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet") > df_parquet.collect() > {code} > with the vectorized reader enabled this causes > {code} > In [2]: df.collect() > Out[2]: > [Row(index=1, long=6), > Row(index=2, long=7), > Row(index=3, long=None), > Row(index=4, long=8), > Row(index=5, long=9)] > In [3]: df_parquet.collect() > Out[3]: > [Row(index=1, long=6), > Row(index=2, long=7), > Row(index=3, long=8), > Row(index=4, long=9), > Row(index=5, long=5)] > {code} > as you can see reading the data back from disk causes data to get shifted up > and between columns. > with the vectorized reader disabled we are completely unable to read the file. > {code} > Py4JJavaError: An error occurred while calling o143.collectToPython. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 > (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not > read value at 4 in block 0 in file > file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228) > at > org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) > at > org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$appl
[jira] [Commented] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet
[ https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15832195#comment-15832195 ] Jason White commented on SPARK-19299: - These seem like two or three separate issues. - Python long doesn't fit into Scala long, resulting in Null even in a non-nullable field - Python datetime near the epoch doesn't fit (?) into a Scala timestamp, resulting in Null even in a non-nullable field - DataFrame serialization to disk doesn't assumes, but doesn't verify, no nulls exist in non-nullable fields. This assumption is obviously fragile. (Also tested with JSON serialization - the column simply disappears in the JSON output for that row) > Nulls in non nullable columns causes data corruption in parquet > --- > > Key: SPARK-19299 > URL: https://issues.apache.org/jira/browse/SPARK-19299 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core >Affects Versions: 1.6.0, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Franklyn Dsouza > > The problem we're seeing is that if a null occurs in a no-nullable field and > is written down to parquet the resulting file gets corrupt and can not be > read back correctly. > One way that this can occur is when a long value in python is too big to fit > into a sql LongType it gets cast to null. > We're also seeing that the behaviour is different depending on whether or not > the vectorized reader is enabled. > Here's an example in PySpark > {code} > from datetime import datetime > from pyspark.sql import types > data = [ > (1, 6), > (2, 7), > (3, 2 ** 64), # value overflows sql LongType > (4, 8), > (5, 9) > ] > schema = types.StructType([ > types.StructField("index", types.LongType(), False), > types.StructField("long", types.LongType(), False), > ]) > df = sc.sql.createDataFrame(data, schema) > df.collect() > df.write.parquet("corrupt_parquet") > df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet") > df_parquet.collect() > {code} > with the vectorized reader enabled this causes > {code} > In [2]: df.collect() > Out[2]: > [Row(index=1, long=6), > Row(index=2, long=7), > Row(index=3, long=None), > Row(index=4, long=8), > Row(index=5, long=9)] > In [3]: df_parquet.collect() > Out[3]: > [Row(index=1, long=6), > Row(index=2, long=7), > Row(index=3, long=8), > Row(index=4, long=9), > Row(index=5, long=5)] > {code} > as you can see reading the data back from disk causes data to get shifted up > and between columns. > with the vectorized reader disabled we are completely unable to read the file. > {code} > Py4JJavaError: An error occurred while calling o143.collectToPython. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 > (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not > read value at 4 in block 0 in file > file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228) > at > org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) > at > org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.con
[jira] [Commented] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet
[ https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15832006#comment-15832006 ] Jason White commented on SPARK-19299: - Also seeing this same behaviour in Spark 2.0.1 when creating a DataFrame with a timestamp at or near the epoch. My computer is in Eastern time, so 1969-12-31T19:00:00-0500 is unix timestamp 0. {code} >>> from datetime import datetime >>> dt = datetime(1969, 12, 31, 19, 0, 0) >>> from pyspark.sql import SQLContext >>> sql = SQLContext(sc) >>> from pyspark.sql.types import StructType, StructField, TimestampType >>> schema = StructType([StructField('ts', TimestampType(), False)]) >>> df = sql.createDataFrame([(dt,)], schema) >>> df.schema StructType(List(StructField(ts,TimestampType,false))) >>> df.collect() [Row(ts=None)] {code} Weirdly, this continues on for over half an hour after the epoch: {code} >>> dt = datetime(1969, 12, 31, 19, 35, 47) >>> df = sql.createDataFrame([(dt,)], schema) >>> df.collect() [Row(ts=None)] >>> dt = datetime(1969, 12, 31, 19, 35, 48) >>> df = sql.createDataFrame([(dt,)], schema) >>> df.collect() [Row(ts=datetime.datetime(1969, 12, 31, 19, 35, 48))] {code} > Nulls in non nullable columns causes data corruption in parquet > --- > > Key: SPARK-19299 > URL: https://issues.apache.org/jira/browse/SPARK-19299 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core >Affects Versions: 1.6.0, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Franklyn Dsouza > > The problem we're seeing is that if a null occurs in a no-nullable field and > is written down to parquet the resulting file gets corrupt and can not be > read back correctly. > One way that this can occur is when a long value in python is too big to fit > into a sql LongType it gets cast to null. > We're also seeing that the behaviour is different depending on whether or not > the vectorized reader is enabled. > Here's an example in PySpark > {code} > from datetime import datetime > from pyspark.sql import types > data = [ > (1, 6), > (2, 7), > (3, 2 ** 64), # value overflows sql LongType > (4, 8), > (5, 9) > ] > schema = types.StructType([ > types.StructField("index", types.LongType(), False), > types.StructField("long", types.LongType(), False), > ]) > df = sc.sql.createDataFrame(data, schema) > df.collect() > df.write.parquet("corrupt_parquet") > df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet") > df_parquet.collect() > {code} > with the vectorized reader enabled this causes > {code} > In [2]: df.collect() > Out[2]: > [Row(index=1, long=6), > Row(index=2, long=7), > Row(index=3, long=None), > Row(index=4, long=8), > Row(index=5, long=9)] > In [3]: df_parquet.collect() > Out[3]: > [Row(index=1, long=6), > Row(index=2, long=7), > Row(index=3, long=8), > Row(index=4, long=9), > Row(index=5, long=5)] > {code} > as you can see reading the data back from disk causes data to get shifted up > and between columns. > with the vectorized reader disabled we are completely unable to read the file. > {code} > Py4JJavaError: An error occurred while calling o143.collectToPython. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 > (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not > read value at 4 in block 0 in file > file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228) > at > org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) > at > org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapParti
[jira] [Commented] (SPARK-10915) Add support for UDAFs in Python
[ https://issues.apache.org/jira/browse/SPARK-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592831#comment-15592831 ] Jason White commented on SPARK-10915: - At the moment, we use .repartitionAndSortWithinPartitions to give us a strictly ordered iterable that we can process one at a time. We don't have a Python list sitting in memory, instead we rely on ExternalSort to order in a memory-safe way. I don't yet have enough experience with DataFrames to know if we will have the same or similar problems there. It's possible that collect_list will perform better - I'll give that a try when we get there and report back on this ticket if it's a suitable approach for our use case. > Add support for UDAFs in Python > --- > > Key: SPARK-10915 > URL: https://issues.apache.org/jira/browse/SPARK-10915 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Reporter: Justin Uang > > This should support python defined lambdas. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10915) Add support for UDAFs in Python
[ https://issues.apache.org/jira/browse/SPARK-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592534#comment-15592534 ] Jason White commented on SPARK-10915: - That's unfortunate. Materializing a list somewhere is exactly what we're trying to avoid. The lists can get unpredictably long for some small number of keys, and this approach tends to cause us to blow by our memory ceiling, at least when using RDDs. It's why we don't use .groupByKey unless absolutely necessary. > Add support for UDAFs in Python > --- > > Key: SPARK-10915 > URL: https://issues.apache.org/jira/browse/SPARK-10915 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Reporter: Justin Uang > > This should support python defined lambdas. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10915) Add support for UDAFs in Python
[ https://issues.apache.org/jira/browse/SPARK-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591706#comment-15591706 ] Jason White commented on SPARK-10915: - We would also very much like Python UDAFs. In particular, we have some situations where value ordering matters, e.g. a state machine. .reduceByKey can't be used here (not associative), so we've come up with our own function .overByKey that makes use of .repartitionAndSortWithinPartitions, and applies a function to the sorted values for each key. We'd like to move more of our logic over to DataFrames and minimize the number of times we need to dive down into RDDs. This issue is one of the primary reasons we have to keep going back to RDDs. > Add support for UDAFs in Python > --- > > Key: SPARK-10915 > URL: https://issues.apache.org/jira/browse/SPARK-10915 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Reporter: Justin Uang > > This should support python defined lambdas. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17679) Remove unnecessary Py4J ListConverter patch
Jason White created SPARK-17679: --- Summary: Remove unnecessary Py4J ListConverter patch Key: SPARK-17679 URL: https://issues.apache.org/jira/browse/SPARK-17679 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.0.0 Reporter: Jason White Priority: Minor In SPARK-6949 davies documented a couple of bugs with Py4J that prevented Spark from registering a converter for date and datetime objects. Patched in https://github.com/apache/spark/pull/5570. Specifically https://github.com/bartdag/py4j/issues/160 dealt with ListConverter automatically converting bytearrays into ArrayList instead of leaving it alone. Py4J #160 has since been fixed in Py4J, since the 0.9 release a couple of months after Spark #5570. According to spark-core's pom.xml, we're using 0.10.3. We should remove this patch on ListConverter since the upstream package no longer has this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-14700) PySpark Row equality operator is not overridden
Jason White created SPARK-14700: --- Summary: PySpark Row equality operator is not overridden Key: SPARK-14700 URL: https://issues.apache.org/jira/browse/SPARK-14700 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 1.6.1 Reporter: Jason White The pyspark.sql.Row class doesn't override the equality operator. As a result, it uses the superclass's equality operator, `tuple`. This is insufficient, as the order of the elements in the tuple are meant to be used in combination with the private `__fields__` member. This leads to difficulties in preparing proper unit tests in PySpark DataFrames. It leads to seemingly illogical conditions such as: Row(a=1) == Row(b=1) # True, since column names aren't considered r1 = Row('b', 'a')(2, 1) # Row(b=2, a=1) r1 == Row(b=2, a=1) # False, since kwarg operators are sorted alphabetically in the Row constructor r1 == Row(a=2, b=1) # True, since the tuple for each is (2, 1) Indeed, a few bugs in existing Spark code were exposed when I patched this. PR incoming. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-12073) Backpressure causes individual Kafka partitions to lag
Jason White created SPARK-12073: --- Summary: Backpressure causes individual Kafka partitions to lag Key: SPARK-12073 URL: https://issues.apache.org/jira/browse/SPARK-12073 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.5.2 Reporter: Jason White We're seeing a growing lag on (2) individual Kafka partitions, on a topic with 32 partitions. Our individual batch sessions are completing in 5-7s, with a batch window of 30s, so there's plenty of room for Streaming to catch up, but it looks to be intentionally limiting itself. These partitions are experiencing unbalanced load (higher than most of the others) What I believe is happening is that maxMessagesPerPartition calculates an appropriate limit for the message rate from all partitions, and then divides by the number of partitions to determine how many messages to retrieve per partition. The problem with this approach is that when one partition is behind by millions of records (due to random Kafka issues) or is experiencing heavy load, the number of messages to be retrieved shouldn't be evenly split among the partitions. In this scenario, if the rate estimator calculates only 100k total messages can be retrieved, each partition (out of say 32) only retrieves max 100k/32=3125 messages. Under some conditions, this results in the backpressure keeping the lagging partition from recovering. The PIDRateEstimator doesn't increase the number of messages to retrieve enough to recover, and we stabilize at a point where these individual partitions slowly grow. I have a PR on our fork in progress to allocate the maxMessagesPerPartition by weighting the number to be retrieved on the current lag each partition is currently experiencing. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11437) createDataFrame shouldn't .take() when provided schema
[ https://issues.apache.org/jira/browse/SPARK-11437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14984009#comment-14984009 ] Jason White commented on SPARK-11437: - [~marmbrus] We briefly discussed this at SparkSummitEU this week. > createDataFrame shouldn't .take() when provided schema > -- > > Key: SPARK-11437 > URL: https://issues.apache.org/jira/browse/SPARK-11437 > Project: Spark > Issue Type: Improvement > Components: PySpark >Reporter: Jason White > > When creating a DataFrame from an RDD in PySpark, `createDataFrame` calls > `.take(10)` to verify the first 10 rows of the RDD match the provided schema. > Similar to https://issues.apache.org/jira/browse/SPARK-8070, but that issue > affected cases where a schema was not provided. > Verifying the first 10 rows is of limited utility and causes the DAG to be > executed non-lazily. If necessary, I believe this verification should be done > lazily on all rows. However, since the caller is providing a schema to > follow, I think it's acceptable to simply fail if the schema is incorrect. > https://github.com/apache/spark/blob/master/python/pyspark/sql/context.py#L321-L325 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-11437) createDataFrame shouldn't .take() when provided schema
Jason White created SPARK-11437: --- Summary: createDataFrame shouldn't .take() when provided schema Key: SPARK-11437 URL: https://issues.apache.org/jira/browse/SPARK-11437 Project: Spark Issue Type: Improvement Components: PySpark Reporter: Jason White When creating a DataFrame from an RDD in PySpark, `createDataFrame` calls `.take(10)` to verify the first 10 rows of the RDD match the provided schema. Similar to https://issues.apache.org/jira/browse/SPARK-8070, but that issue affected cases where a schema was not provided. Verifying the first 10 rows is of limited utility and causes the DAG to be executed non-lazily. If necessary, I believe this verification should be done lazily on all rows. However, since the caller is providing a schema to follow, I think it's acceptable to simply fail if the schema is incorrect. https://github.com/apache/spark/blob/master/python/pyspark/sql/context.py#L321-L325 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8453) Unioning two RDDs in PySpark doesn't spill to disk
[ https://issues.apache.org/jira/browse/SPARK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14592541#comment-14592541 ] Jason White commented on SPARK-8453: Interestingly, if you repartition the RDDs to the same number of partitions before unioning them, there are no memory spikes at all: ``` profiler = sc.textFile('/user/jasonwhite/profiler') profiler = profiler.repartition(profiler.getNumPartitions()) profiler_2 = sc.textFile('/user/jasonwhite/profiler') profiler_2 = profiler_2.repartition(profiler_2.getNumPartitions()) z = profiler.union(profiler_2) z.count() ``` Obviously I'd rather not repartition every dataset after being loaded via `sc.textFile`. Any ideas as to what could be the issue? > Unioning two RDDs in PySpark doesn't spill to disk > -- > > Key: SPARK-8453 > URL: https://issues.apache.org/jira/browse/SPARK-8453 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.4.0 >Reporter: Jason White > Labels: memory, union > > When unioning 2 RDDs together in PySpark, spill limits do not seem to be > recognized. Our YARN containers are frequently killed for exceeding memory > limits for this reason. > I have been able to reproduce this in the following simple scenario: > - spark.executor.instances: 1, spark.executor.memory: 512m, > spark.executor.cores: 20, spark.python.worker.reuse: false, > spark.shuffle.spill: true, spark.yarn.executor.memoryOverhead: 5000 > (I recognize this is not a good setup - I set things up this way to explore > this problem and make the symptom easier to isolate) > I have a 1-billion-row dataset, split up evenly into 1000 partitions. Each > partition contains exactly 1 million rows. Each row contains approximately > 250 characters, +/- 10. > I executed the following in a PySpark shell: > ``` > profiler = sc.textFile('/user/jasonwhite/profiler') > profiler_2 = sc.textFile('/user/jasonwhite/profiler') > profiler.count() > profiler_2.count() > ``` > Total container memory utilization was between 2500 & 2800 MB over the total > execution, with no spill. No problem. > Then I executed: > ``` > z = profiler.union(profiler_2) > z.count() > ``` > Memory utilization spiked immediately to between 4700 & 4900 MB over the > course of execution, also with no spill. Big problem. Since we are setting > our container memory sizes based in part on the Python spill limit, when > these spill limits are not properly recognized, our containers are > unexpectedly killed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-8453) Unioning two RDDs in PySpark doesn't spill to disk
Jason White created SPARK-8453: -- Summary: Unioning two RDDs in PySpark doesn't spill to disk Key: SPARK-8453 URL: https://issues.apache.org/jira/browse/SPARK-8453 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 1.4.0 Reporter: Jason White When unioning 2 RDDs together in PySpark, spill limits do not seem to be recognized. Our YARN containers are frequently killed for exceeding memory limits for this reason. I have been able to reproduce this in the following simple scenario: - spark.executor.instances: 1, spark.executor.memory: 512m, spark.executor.cores: 20, spark.python.worker.reuse: false, spark.shuffle.spill: true, spark.yarn.executor.memoryOverhead: 5000 (I recognize this is not a good setup - I set things up this way to explore this problem and make the symptom easier to isolate) I have a 1-billion-row dataset, split up evenly into 1000 partitions. Each partition contains exactly 1 million rows. Each row contains approximately 250 characters, +/- 10. I executed the following in a PySpark shell: ``` profiler = sc.textFile('/user/jasonwhite/profiler') profiler_2 = sc.textFile('/user/jasonwhite/profiler') profiler.count() profiler_2.count() ``` Total container memory utilization was between 2500 & 2800 MB over the total execution, with no spill. No problem. Then I executed: ``` z = profiler.union(profiler_2) z.count() ``` Memory utilization spiked immediately to between 4700 & 4900 MB over the course of execution, also with no spill. Big problem. Since we are setting our container memory sizes based in part on the Python spill limit, when these spill limits are not properly recognized, our containers are unexpectedly killed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org