[jira] [Assigned] (SPARK-28063) Replace deprecated `.newInstance()` in DSv2 `Catalogs`
[ https://issues.apache.org/jira/browse/SPARK-28063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-28063: Assignee: (was: Apache Spark) > Replace deprecated `.newInstance()` in DSv2 `Catalogs` > -- > > Key: SPARK-28063 > URL: https://issues.apache.org/jira/browse/SPARK-28063 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Dongjoon Hyun >Priority: Minor > > This issue aims to replace deprecated `.newInstance()` in DSv2 `Catalogs` and > distinguish the plugin class errors more. > SPARK-25984 removes all instances of the deprecated `.newInstance()` usages > at Nov 10, 2018, but SPARK-24252 adds on March 8, 2019. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-28063) Replace deprecated `.newInstance()` in DSv2 `Catalogs`
[ https://issues.apache.org/jira/browse/SPARK-28063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-28063: Assignee: Apache Spark > Replace deprecated `.newInstance()` in DSv2 `Catalogs` > -- > > Key: SPARK-28063 > URL: https://issues.apache.org/jira/browse/SPARK-28063 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Dongjoon Hyun >Assignee: Apache Spark >Priority: Minor > > This issue aims to replace deprecated `.newInstance()` in DSv2 `Catalogs` and > distinguish the plugin class errors more. > SPARK-25984 removes all instances of the deprecated `.newInstance()` usages > at Nov 10, 2018, but SPARK-24252 adds on March 8, 2019. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28063) Replace deprecated `.newInstance()` in DSv2 `Catalogs`
Dongjoon Hyun created SPARK-28063: - Summary: Replace deprecated `.newInstance()` in DSv2 `Catalogs` Key: SPARK-28063 URL: https://issues.apache.org/jira/browse/SPARK-28063 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Dongjoon Hyun This issue aims to replace deprecated `.newInstance()` in DSv2 `Catalogs` and distinguish the plugin class errors more. SPARK-25984 removes all instances of the deprecated `.newInstance()` usages at Nov 10, 2018, but SPARK-24252 adds on March 8, 2019. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23160) Add window.sql
[ https://issues.apache.org/jira/browse/SPARK-23160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864931#comment-16864931 ] Dongjoon Hyun commented on SPARK-23160: --- You're welcome, [~DylanGuedes]. > Add window.sql > -- > > Key: SPARK-23160 > URL: https://issues.apache.org/jira/browse/SPARK-23160 > Project: Spark > Issue Type: Sub-task > Components: SQL, Tests >Affects Versions: 3.0.0 >Reporter: Xingbo Jiang >Priority: Minor > > In this ticket, we plan to add the regression test cases of > https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/window.sql. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23160) Add window.sql
[ https://issues.apache.org/jira/browse/SPARK-23160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23160: Assignee: Apache Spark > Add window.sql > -- > > Key: SPARK-23160 > URL: https://issues.apache.org/jira/browse/SPARK-23160 > Project: Spark > Issue Type: Sub-task > Components: SQL, Tests >Affects Versions: 3.0.0 >Reporter: Xingbo Jiang >Assignee: Apache Spark >Priority: Minor > > In this ticket, we plan to add the regression test cases of > https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/window.sql. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23160) Add window.sql
[ https://issues.apache.org/jira/browse/SPARK-23160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23160: Assignee: (was: Apache Spark) > Add window.sql > -- > > Key: SPARK-23160 > URL: https://issues.apache.org/jira/browse/SPARK-23160 > Project: Spark > Issue Type: Sub-task > Components: SQL, Tests >Affects Versions: 3.0.0 >Reporter: Xingbo Jiang >Priority: Minor > > In this ticket, we plan to add the regression test cases of > https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/window.sql. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-28062) HuberAggregator copies coefficients vector every time an instance is added
[ https://issues.apache.org/jira/browse/SPARK-28062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-28062: Assignee: (was: Apache Spark) > HuberAggregator copies coefficients vector every time an instance is added > -- > > Key: SPARK-28062 > URL: https://issues.apache.org/jira/browse/SPARK-28062 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 3.0.0 >Reporter: Andrew Crosby >Priority: Major > > Every time an instance is added to the HuberAggregator, a copy of the > coefficients vector is created (see code snippet below). This causes a > performance degradation, which is particularly severe when the instances have > long sparse feature vectors. > {code:scala} > def add(instance: Instance): HuberAggregator = { > instance match { case Instance(label, weight, features) => > require(numFeatures == features.size, s"Dimensions mismatch when adding > new sample." + > s" Expecting $numFeatures but got ${features.size}.") > require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") > if (weight == 0.0) return this > val localFeaturesStd = bcFeaturesStd.value > val localCoefficients = bcParameters.value.toArray.slice(0, numFeatures) > val localGradientSumArray = gradientSumArray > // Snip > } > {code} > The LeastSquaresAggregator class avoids this performance issue via the use of > transient lazy class variables to store such reused values. Applying a > similar approach to HuberAggregator gives a significant speed boost. Running > the script below locally on my machine gives the following timing results: > {noformat} > Current implementation: > Time(s): 540.1439919471741 > Iterations: 26 > Intercept: 0.518109382890512 > Coefficients: [0.0, -0.2516936902000245, 0.0, 0.0, -0.19633887469839809, > 0.0, -0.39565545053893925, 0.0, -0.18617574426698882, 0.0478922416670529] > Modified implementation to match LeastSquaresAggregator: > Time(s): 46.82946586608887 > Iterations: 26 > Intercept: 0.5181093828893774 > Coefficients: [0.0, -0.25169369020031357, 0.0, 0.0, -0.1963388746927919, > 0.0, -0.3956554505389966, 0.0, -0.18617574426702874, 0.04789224166878518] > {noformat} > {code:python} > from random import random, randint, seed > import time > from pyspark.ml.feature import OneHotEncoder > from pyspark.ml.regression import LinearRegression > from pyspark.sql import SparkSession > seed(0) > spark = SparkSession.builder.appName('huber-speed-test').getOrCreate() > df = spark.createDataFrame([[randint(0, 10), random()] for i in > range(10)], ["category", "target"]) > ohe = OneHotEncoder(inputCols=["category"], > outputCols=["encoded_category"]).fit(df) > lr = LinearRegression(featuresCol="encoded_category", labelCol="target", > loss="huber", regParam=1.0) > start = time.time() > model = lr.fit(ohe.transform(df)) > end = time.time() > print("Time(s): " + str(end - start)) > print("Iterations: " + str(model.summary.totalIterations)) > print("Intercept: " + str(model.intercept)) > print("Coefficients: " + str(list(model.coefficients)[0:10])) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-28062) HuberAggregator copies coefficients vector every time an instance is added
[ https://issues.apache.org/jira/browse/SPARK-28062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-28062: Assignee: Apache Spark > HuberAggregator copies coefficients vector every time an instance is added > -- > > Key: SPARK-28062 > URL: https://issues.apache.org/jira/browse/SPARK-28062 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 3.0.0 >Reporter: Andrew Crosby >Assignee: Apache Spark >Priority: Major > > Every time an instance is added to the HuberAggregator, a copy of the > coefficients vector is created (see code snippet below). This causes a > performance degradation, which is particularly severe when the instances have > long sparse feature vectors. > {code:scala} > def add(instance: Instance): HuberAggregator = { > instance match { case Instance(label, weight, features) => > require(numFeatures == features.size, s"Dimensions mismatch when adding > new sample." + > s" Expecting $numFeatures but got ${features.size}.") > require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") > if (weight == 0.0) return this > val localFeaturesStd = bcFeaturesStd.value > val localCoefficients = bcParameters.value.toArray.slice(0, numFeatures) > val localGradientSumArray = gradientSumArray > // Snip > } > {code} > The LeastSquaresAggregator class avoids this performance issue via the use of > transient lazy class variables to store such reused values. Applying a > similar approach to HuberAggregator gives a significant speed boost. Running > the script below locally on my machine gives the following timing results: > {noformat} > Current implementation: > Time(s): 540.1439919471741 > Iterations: 26 > Intercept: 0.518109382890512 > Coefficients: [0.0, -0.2516936902000245, 0.0, 0.0, -0.19633887469839809, > 0.0, -0.39565545053893925, 0.0, -0.18617574426698882, 0.0478922416670529] > Modified implementation to match LeastSquaresAggregator: > Time(s): 46.82946586608887 > Iterations: 26 > Intercept: 0.5181093828893774 > Coefficients: [0.0, -0.25169369020031357, 0.0, 0.0, -0.1963388746927919, > 0.0, -0.3956554505389966, 0.0, -0.18617574426702874, 0.04789224166878518] > {noformat} > {code:python} > from random import random, randint, seed > import time > from pyspark.ml.feature import OneHotEncoder > from pyspark.ml.regression import LinearRegression > from pyspark.sql import SparkSession > seed(0) > spark = SparkSession.builder.appName('huber-speed-test').getOrCreate() > df = spark.createDataFrame([[randint(0, 10), random()] for i in > range(10)], ["category", "target"]) > ohe = OneHotEncoder(inputCols=["category"], > outputCols=["encoded_category"]).fit(df) > lr = LinearRegression(featuresCol="encoded_category", labelCol="target", > loss="huber", regParam=1.0) > start = time.time() > model = lr.fit(ohe.transform(df)) > end = time.time() > print("Time(s): " + str(end - start)) > print("Iterations: " + str(model.summary.totalIterations)) > print("Intercept: " + str(model.intercept)) > print("Coefficients: " + str(list(model.coefficients)[0:10])) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28062) HuberAggregator copies coefficients vector every time an instance is added
Andrew Crosby created SPARK-28062: - Summary: HuberAggregator copies coefficients vector every time an instance is added Key: SPARK-28062 URL: https://issues.apache.org/jira/browse/SPARK-28062 Project: Spark Issue Type: Bug Components: ML Affects Versions: 3.0.0 Reporter: Andrew Crosby Every time an instance is added to the HuberAggregator, a copy of the coefficients vector is created (see code snippet below). This causes a performance degradation, which is particularly severe when the instances have long sparse feature vectors. {code:scala} def add(instance: Instance): HuberAggregator = { instance match { case Instance(label, weight, features) => require(numFeatures == features.size, s"Dimensions mismatch when adding new sample." + s" Expecting $numFeatures but got ${features.size}.") require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") if (weight == 0.0) return this val localFeaturesStd = bcFeaturesStd.value val localCoefficients = bcParameters.value.toArray.slice(0, numFeatures) val localGradientSumArray = gradientSumArray // Snip } {code} The LeastSquaresAggregator class avoids this performance issue via the use of transient lazy class variables to store such reused values. Applying a similar approach to HuberAggregator gives a significant speed boost. Running the script below locally on my machine gives the following timing results: {noformat} Current implementation: Time(s): 540.1439919471741 Iterations: 26 Intercept: 0.518109382890512 Coefficients: [0.0, -0.2516936902000245, 0.0, 0.0, -0.19633887469839809, 0.0, -0.39565545053893925, 0.0, -0.18617574426698882, 0.0478922416670529] Modified implementation to match LeastSquaresAggregator: Time(s): 46.82946586608887 Iterations: 26 Intercept: 0.5181093828893774 Coefficients: [0.0, -0.25169369020031357, 0.0, 0.0, -0.1963388746927919, 0.0, -0.3956554505389966, 0.0, -0.18617574426702874, 0.04789224166878518] {noformat} {code:python} from random import random, randint, seed import time from pyspark.ml.feature import OneHotEncoder from pyspark.ml.regression import LinearRegression from pyspark.sql import SparkSession seed(0) spark = SparkSession.builder.appName('huber-speed-test').getOrCreate() df = spark.createDataFrame([[randint(0, 10), random()] for i in range(10)], ["category", "target"]) ohe = OneHotEncoder(inputCols=["category"], outputCols=["encoded_category"]).fit(df) lr = LinearRegression(featuresCol="encoded_category", labelCol="target", loss="huber", regParam=1.0) start = time.time() model = lr.fit(ohe.transform(df)) end = time.time() print("Time(s): " + str(end - start)) print("Iterations: " + str(model.summary.totalIterations)) print("Intercept: " + str(model.intercept)) print("Coefficients: " + str(list(model.coefficients)[0:10])) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-28052) ArrayExists should follow the three-valued boolean logic.
[ https://issues.apache.org/jira/browse/SPARK-28052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-28052. --- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 24873 [https://github.com/apache/spark/pull/24873] > ArrayExists should follow the three-valued boolean logic. > - > > Key: SPARK-28052 > URL: https://issues.apache.org/jira/browse/SPARK-28052 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.3 >Reporter: Takuya Ueshin >Assignee: Takuya Ueshin >Priority: Major > Fix For: 3.0.0 > > > Currently {{ArrayExists}} always returns boolean values (if the arguments are > not null), but it should follow the three-valued boolean logic: > - {{true}} if the predicate holds at least one {{true}} > - otherwise, {{null}} if the predicate holds {{null}} > - otherwise, {{false}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-28052) ArrayExists should follow the three-valued boolean logic.
[ https://issues.apache.org/jira/browse/SPARK-28052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun reassigned SPARK-28052: - Assignee: Takuya Ueshin > ArrayExists should follow the three-valued boolean logic. > - > > Key: SPARK-28052 > URL: https://issues.apache.org/jira/browse/SPARK-28052 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.3 >Reporter: Takuya Ueshin >Assignee: Takuya Ueshin >Priority: Major > > Currently {{ArrayExists}} always returns boolean values (if the arguments are > not null), but it should follow the three-valued boolean logic: > - {{true}} if the predicate holds at least one {{true}} > - otherwise, {{null}} if the predicate holds {{null}} > - otherwise, {{false}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames
[ https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiangrui Meng resolved SPARK-26412. --- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 24643 [https://github.com/apache/spark/pull/24643] > Allow Pandas UDF to take an iterator of pd.DataFrames > - > > Key: SPARK-26412 > URL: https://issues.apache.org/jira/browse/SPARK-26412 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Weichen Xu >Priority: Major > Fix For: 3.0.0 > > > Pandas UDF is the ideal connection between PySpark and DL model inference > workload. However, user needs to load the model file first to make > predictions. It is common to see models of size ~100MB or bigger. If the > Pandas UDF execution is limited to each batch, user needs to repeatedly load > the same model for every batch in the same python worker process, which is > inefficient. > We can provide users the iterator of batches in pd.DataFrame and let user > code handle it: > {code} > @pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITER) > def predict(batch_iter): > model = ... # load model > for batch in batch_iter: > yield model.predict(batch) > {code} > The type of each batch is: > * a pd.Series if UDF is called with a single non-struct-type column > * a tuple of pd.Series if UDF is called with more than one Spark DF columns > * a pd.DataFrame if UDF is called with a single StructType column > Examples: > {code} > @pandas_udf(...) > def evaluate(batch_iter): > model = ... # load model > for features, label in batch_iter: > pred = model.predict(features) > yield (pred - label).abs() > df.select(evaluate(col("features"), col("label")).alias("err")) > {code} > {code} > @pandas_udf(...) > def evaluate(pdf_iter): > model = ... # load model > for pdf in pdf_iter: > pred = model.predict(pdf['x']) > yield (pred - pdf['y']).abs() > df.select(evaluate(struct(col("features"), col("label"))).alias("err")) > {code} > If the UDF doesn't return the same number of records for the entire > partition, user should see an error. We don't restrict that every yield > should match the input batch size. > Another benefit is with iterator interface and asyncio from Python, it is > flexible for users to implement data pipelining. > cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28061) Support for converting float/double to binary format
Yuming Wang created SPARK-28061: --- Summary: Support for converting float/double to binary format Key: SPARK-28061 URL: https://issues.apache.org/jira/browse/SPARK-28061 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.0.0 Reporter: Yuming Wang Examples: {code:sql} SELECT float4send('5e-20'::float4); SELECT float4send('67e14'::float4); SELECT float4send('985e15'::float4); SELECT float4send('55895e-16'::float4); SELECT float4send('7038531e-32'::float4); SELECT float4send('702990899e-20'::float4); {code} float4send: https://github.com/postgres/postgres/blob/16d489b0fe058e527619f5e9d92fd7ca3c6c2994/src/backend/utils/adt/float.c#L314-L326 float8send: https://github.com/postgres/postgres/blob/16d489b0fe058e527619f5e9d92fd7ca3c6c2994/src/backend/utils/adt/float.c#L566-L578 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23160) Add window.sql
[ https://issues.apache.org/jira/browse/SPARK-23160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864692#comment-16864692 ] Dylan Guedes commented on SPARK-23160: -- Thank you! I'll be working on this, then. > Add window.sql > -- > > Key: SPARK-23160 > URL: https://issues.apache.org/jira/browse/SPARK-23160 > Project: Spark > Issue Type: Sub-task > Components: SQL, Tests >Affects Versions: 3.0.0 >Reporter: Xingbo Jiang >Priority: Minor > > In this ticket, we plan to add the regression test cases of > https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/window.sql. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28058) Reading csv with DROPMALFORMED sometimes doesn't drop malformed records
[ https://issues.apache.org/jira/browse/SPARK-28058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864674#comment-16864674 ] Hyukjin Kwon commented on SPARK-28058: -- >From a cursory look, seems like this behaviour was inherited from Univocity >parser. > Reading csv with DROPMALFORMED sometimes doesn't drop malformed records > --- > > Key: SPARK-28058 > URL: https://issues.apache.org/jira/browse/SPARK-28058 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.1, 2.4.3 >Reporter: Stuart White >Priority: Minor > Labels: CSV, csv, csvparser > > The spark sql csv reader is not dropping malformed records as expected. > Consider this file (fruit.csv). Notice it contains a header record, 3 valid > records, and one malformed record. > {noformat} > fruit,color,price,quantity > apple,red,1,3 > banana,yellow,2,4 > orange,orange,3,5 > xxx > {noformat} > If I read this file using the spark sql csv reader as follows, everything > looks good. The malformed record is dropped. > {noformat} > scala> spark.read.option("header", "true").option("mode", > "DROPMALFORMED").csv("fruit.csv").show(truncate=false) > +--+--+-++ > > |fruit |color |price|quantity| > +--+--+-++ > |apple |red |1|3 | > |banana|yellow|2|4 | > |orange|orange|3|5 | > +--+--+-++ > {noformat} > However, if I select a subset of the columns, the malformed record is not > dropped. The malformed data is placed in the first column, and the remaining > column(s) are filled with nulls. > {noformat} > scala> spark.read.option("header", "true").option("mode", > "DROPMALFORMED").csv("fruit.csv").select('fruit).show(truncate=false) > +--+ > |fruit | > +--+ > |apple | > |banana| > |orange| > |xxx | > +--+ > scala> spark.read.option("header", "true").option("mode", > "DROPMALFORMED").csv("fruit.csv").select('fruit, 'color).show(truncate=false) > +--+--+ > |fruit |color | > +--+--+ > |apple |red | > |banana|yellow| > |orange|orange| > |xxx |null | > +--+--+ > scala> spark.read.option("header", "true").option("mode", > "DROPMALFORMED").csv("fruit.csv").select('fruit, 'color, > 'price).show(truncate=false) > +--+--+-+ > |fruit |color |price| > +--+--+-+ > |apple |red |1| > |banana|yellow|2| > |orange|orange|3| > |xxx |null |null | > +--+--+-+ > {noformat} > And finally, if I manually select all of the columns, the malformed record is > once again dropped. > {noformat} > scala> spark.read.option("header", "true").option("mode", > "DROPMALFORMED").csv("fruit.csv").select('fruit, 'color, 'price, > 'quantity).show(truncate=false) > +--+--+-++ > |fruit |color |price|quantity| > +--+--+-++ > |apple |red |1|3 | > |banana|yellow|2|4 | > |orange|orange|3|5 | > +--+--+-++ > {noformat} > I would expect the malformed record(s) to be dropped regardless of which > columns are being selected from the file. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27870) Flush each batch for pandas UDF (for improving pandas UDFs pipeline)
[ https://issues.apache.org/jira/browse/SPARK-27870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-27870: Assignee: Hyukjin Kwon > Flush each batch for pandas UDF (for improving pandas UDFs pipeline) > > > Key: SPARK-27870 > URL: https://issues.apache.org/jira/browse/SPARK-27870 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: Weichen Xu >Assignee: Hyukjin Kwon >Priority: Major > Fix For: 3.0.0 > > > Flush each batch for pandas UDF. > This could improve performance when multiple pandas UDF plans are pipelined. > When batch being flushed in time, downstream pandas UDFs will get pipelined > as soon as possible, and pipeline will help hide the donwstream UDFs > computation time. For example: > When the first UDF start computing on batch-3, the second pipelined UDF can > start computing on batch-2, and the third pipelined UDF can start computing > on batch-1. > If we do not flush each batch in time, the donwstream UDF's pipeline will lag > behind too much, which may increase the total processing time. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27870) Flush each batch for pandas UDF (for improving pandas UDFs pipeline)
[ https://issues.apache.org/jira/browse/SPARK-27870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-27870. -- Resolution: Fixed Issue resolved by pull request 24826 [https://github.com/apache/spark/pull/24826] > Flush each batch for pandas UDF (for improving pandas UDFs pipeline) > > > Key: SPARK-27870 > URL: https://issues.apache.org/jira/browse/SPARK-27870 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: Weichen Xu >Assignee: Hyukjin Kwon >Priority: Major > Fix For: 3.0.0 > > > Flush each batch for pandas UDF. > This could improve performance when multiple pandas UDF plans are pipelined. > When batch being flushed in time, downstream pandas UDFs will get pipelined > as soon as possible, and pipeline will help hide the donwstream UDFs > computation time. For example: > When the first UDF start computing on batch-3, the second pipelined UDF can > start computing on batch-2, and the third pipelined UDF can start computing > on batch-1. > If we do not flush each batch in time, the donwstream UDF's pipeline will lag > behind too much, which may increase the total processing time. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27418) Migrate Parquet to File Data Source V2
[ https://issues.apache.org/jira/browse/SPARK-27418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-27418: Assignee: Gengliang Wang > Migrate Parquet to File Data Source V2 > -- > > Key: SPARK-27418 > URL: https://issues.apache.org/jira/browse/SPARK-27418 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Assignee: Gengliang Wang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27418) Migrate Parquet to File Data Source V2
[ https://issues.apache.org/jira/browse/SPARK-27418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-27418. -- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 24327 [https://github.com/apache/spark/pull/24327] > Migrate Parquet to File Data Source V2 > -- > > Key: SPARK-27418 > URL: https://issues.apache.org/jira/browse/SPARK-27418 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Assignee: Gengliang Wang >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23128) A new approach to do adaptive execution in Spark SQL
[ https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herman van Hovell resolved SPARK-23128. --- Resolution: Fixed Fix Version/s: 3.0.0 > A new approach to do adaptive execution in Spark SQL > > > Key: SPARK-23128 > URL: https://issues.apache.org/jira/browse/SPARK-23128 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Carson Wang >Assignee: Maryann Xue >Priority: Major > Fix For: 3.0.0 > > Attachments: AdaptiveExecutioninBaidu.pdf > > > SPARK-9850 proposed the basic idea of adaptive execution in Spark. In > DAGScheduler, a new API is added to support submitting a single map stage. > The current implementation of adaptive execution in Spark SQL supports > changing the reducer number at runtime. An Exchange coordinator is used to > determine the number of post-shuffle partitions for a stage that needs to > fetch shuffle data from one or multiple stages. The current implementation > adds ExchangeCoordinator while we are adding Exchanges. However there are > some limitations. First, it may cause additional shuffles that may decrease > the performance. We can see this from EnsureRequirements rule when it adds > ExchangeCoordinator. Secondly, it is not a good idea to add > ExchangeCoordinators while we are adding Exchanges because we don’t have a > global picture of all shuffle dependencies of a post-shuffle stage. I.e. for > 3 tables’ join in a single stage, the same ExchangeCoordinator should be used > in three Exchanges but currently two separated ExchangeCoordinator will be > added. Thirdly, with the current framework it is not easy to implement other > features in adaptive execution flexibly like changing the execution plan and > handling skewed join at runtime. > We'd like to introduce a new way to do adaptive execution in Spark SQL and > address the limitations. The idea is described at > [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23128) A new approach to do adaptive execution in Spark SQL
[ https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herman van Hovell reassigned SPARK-23128: - Assignee: Maryann Xue > A new approach to do adaptive execution in Spark SQL > > > Key: SPARK-23128 > URL: https://issues.apache.org/jira/browse/SPARK-23128 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Carson Wang >Assignee: Maryann Xue >Priority: Major > Attachments: AdaptiveExecutioninBaidu.pdf > > > SPARK-9850 proposed the basic idea of adaptive execution in Spark. In > DAGScheduler, a new API is added to support submitting a single map stage. > The current implementation of adaptive execution in Spark SQL supports > changing the reducer number at runtime. An Exchange coordinator is used to > determine the number of post-shuffle partitions for a stage that needs to > fetch shuffle data from one or multiple stages. The current implementation > adds ExchangeCoordinator while we are adding Exchanges. However there are > some limitations. First, it may cause additional shuffles that may decrease > the performance. We can see this from EnsureRequirements rule when it adds > ExchangeCoordinator. Secondly, it is not a good idea to add > ExchangeCoordinators while we are adding Exchanges because we don’t have a > global picture of all shuffle dependencies of a post-shuffle stage. I.e. for > 3 tables’ join in a single stage, the same ExchangeCoordinator should be used > in three Exchanges but currently two separated ExchangeCoordinator will be > added. Thirdly, with the current framework it is not easy to implement other > features in adaptive execution flexibly like changing the execution plan and > handling skewed join at runtime. > We'd like to introduce a new way to do adaptive execution in Spark SQL and > address the limitations. The idea is described at > [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-28042) Support mapping spark.local.dir to hostPath volume
[ https://issues.apache.org/jira/browse/SPARK-28042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-28042: Assignee: (was: Apache Spark) > Support mapping spark.local.dir to hostPath volume > -- > > Key: SPARK-28042 > URL: https://issues.apache.org/jira/browse/SPARK-28042 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 3.0.0 >Reporter: Junjie Chen >Priority: Minor > > Currently, the k8s executor builder mount spark.local.dir as emptyDir or > memory, it should satisfy some small workload, while in some heavily workload > like TPCDS, both of them can have some problem, such as pods are evicted due > to disk pressure when using emptyDir, and OOM when using tmpfs. > In particular on cloud environment, users may allocate cluster with minimum > configuration and add cloud storage when running workload. In this case, we > can specify multiple elastic storage as spark.local.dir to accelerate the > spilling. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-28042) Support mapping spark.local.dir to hostPath volume
[ https://issues.apache.org/jira/browse/SPARK-28042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-28042: Assignee: Apache Spark > Support mapping spark.local.dir to hostPath volume > -- > > Key: SPARK-28042 > URL: https://issues.apache.org/jira/browse/SPARK-28042 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 3.0.0 >Reporter: Junjie Chen >Assignee: Apache Spark >Priority: Minor > > Currently, the k8s executor builder mount spark.local.dir as emptyDir or > memory, it should satisfy some small workload, while in some heavily workload > like TPCDS, both of them can have some problem, such as pods are evicted due > to disk pressure when using emptyDir, and OOM when using tmpfs. > In particular on cloud environment, users may allocate cluster with minimum > configuration and add cloud storage when running workload. In this case, we > can specify multiple elastic storage as spark.local.dir to accelerate the > spilling. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org