[jira] [Commented] (SPARK-33184) spark doesn't read data source column if it is used as an index to an array under a struct
[ https://issues.apache.org/jira/browse/SPARK-33184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17217001#comment-17217001 ] colin fang commented on SPARK-33184: I notice there is a quotation mark before `Project`. What does `!Project` mean? > spark doesn't read data source column if it is used as an index to an array > under a struct > -- > > Key: SPARK-33184 > URL: https://issues.apache.org/jira/browse/SPARK-33184 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: colin fang >Priority: Minor > > {code:python} > df = spark.createDataFrame([[1, [[1, 2, > schema='x:int,y:struct>') > df.write.mode('overwrite').parquet('test') > {code} > {code:python} > # This causes an error "Caused by: java.lang.RuntimeException: Couldn't find > x#720 in [y#721]" > spark.read.parquet('test').select(F.expr('y.a[x]')).show() > # Explain works fine, note it doesn't read x in ReadSchema > spark.read.parquet('test').select(F.expr('y.a[x]')).explain() > == Physical Plan == > *(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717] > +- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, > Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], > ReadSchema: struct>> > {code} > The code works well if I > {code:python} > # manually select the column it misses > spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show() > # use element_at function > spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show() > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33184) spark doesn't read data source column if it is used as an index to an array under a struct
[ https://issues.apache.org/jira/browse/SPARK-33184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] colin fang updated SPARK-33184: --- Issue Type: Bug (was: Improvement) > spark doesn't read data source column if it is used as an index to an array > under a struct > -- > > Key: SPARK-33184 > URL: https://issues.apache.org/jira/browse/SPARK-33184 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: colin fang >Priority: Minor > > {code:python} > df = spark.createDataFrame([[1, [[1, 2, > schema='x:int,y:struct>') > df.write.mode('overwrite').parquet('test') > {code} > {code:python} > # This causes an error "Caused by: java.lang.RuntimeException: Couldn't find > x#720 in [y#721]" > spark.read.parquet('test').select(F.expr('y.a[x]')).show() > # Explain works fine, note it doesn't read x in ReadSchema > spark.read.parquet('test').select(F.expr('y.a[x]')).explain() > == Physical Plan == > *(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717] > +- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, > Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], > ReadSchema: struct>> > {code} > The code works well if I > {code:python} > # manually select the column it misses > spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show() > # use element_at function > spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show() > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33184) spark doesn't read data source column if it is used as an index to an array under a struct
[ https://issues.apache.org/jira/browse/SPARK-33184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] colin fang updated SPARK-33184: --- Summary: spark doesn't read data source column if it is used as an index to an array under a struct (was: spark doesn't read data source column if it is needed as an index to an array in a nested struct) > spark doesn't read data source column if it is used as an index to an array > under a struct > -- > > Key: SPARK-33184 > URL: https://issues.apache.org/jira/browse/SPARK-33184 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: colin fang >Priority: Minor > > {code:python} > df = spark.createDataFrame([[1, [[1, 2, > schema='x:int,y:struct>') > df.write.mode('overwrite').parquet('test') > {code} > {code:python} > # This causes an error "Caused by: java.lang.RuntimeException: Couldn't find > x#720 in [y#721]" > spark.read.parquet('test').select(F.expr('y.a[x]')).show() > # Explain works fine, note it doesn't read x in ReadSchema > spark.read.parquet('test').select(F.expr('y.a[x]')).explain() > == Physical Plan == > *(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717] > +- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, > Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], > ReadSchema: struct>> > {code} > The code works well if I > {code:python} > # manually select the column it misses > spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show() > # use element_at function > spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show() > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33184) spark doesn't read data source column if it is needed as an index to an array in a nested struct
[ https://issues.apache.org/jira/browse/SPARK-33184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] colin fang updated SPARK-33184: --- Description: {code:python} df = spark.createDataFrame([[1, [[1, 2, schema='x:int,y:struct>') df.write.mode('overwrite').parquet('test') {code} {code:python} # This causes an error "Caused by: java.lang.RuntimeException: Couldn't find x#720 in [y#721]" spark.read.parquet('test').select(F.expr('y.a[x]')).show() # Explain works fine, note it doesn't read x in ReadSchema spark.read.parquet('test').select(F.expr('y.a[x]')).explain() == Physical Plan == *(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717] +- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], ReadSchema: struct>> {code} The code works well if I {code:python} # manually select the column it misses spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show() # use element_at function spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show() {code} was: {code:python} df = spark.createDataFrame([[1, [[1, 2, schema='x:int,y:struct>') df.write.mode('overwrite').parquet('test') {code} {code:python} # This causes an error "Caused by: java.lang.RuntimeException: Couldn't find x#720 in [y#721]" spark.read.parquet('test').select(F.expr('y.a[x]')).show() # Explain works fine, note it doesn't read x in ReadSchema spark.read.parquet('test').select(F.expr('y.a[x]')).explain() == Physical Plan == *(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717] +- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], ReadSchema: struct>> {code} The code works well if I - manually select the column it misses `spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show()` - or use `F.element_at` function `spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show()` > spark doesn't read data source column if it is needed as an index to an array > in a nested struct > > > Key: SPARK-33184 > URL: https://issues.apache.org/jira/browse/SPARK-33184 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: colin fang >Priority: Minor > > {code:python} > df = spark.createDataFrame([[1, [[1, 2, > schema='x:int,y:struct>') > df.write.mode('overwrite').parquet('test') > {code} > {code:python} > # This causes an error "Caused by: java.lang.RuntimeException: Couldn't find > x#720 in [y#721]" > spark.read.parquet('test').select(F.expr('y.a[x]')).show() > # Explain works fine, note it doesn't read x in ReadSchema > spark.read.parquet('test').select(F.expr('y.a[x]')).explain() > == Physical Plan == > *(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717] > +- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, > Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], > ReadSchema: struct>> > {code} > The code works well if I > {code:python} > # manually select the column it misses > spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show() > # use element_at function > spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show() > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33184) spark doesn't read data source column if it is needed as an index to an array in a nested struct
[ https://issues.apache.org/jira/browse/SPARK-33184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] colin fang updated SPARK-33184: --- Description: {code:python} df = spark.createDataFrame([[1, [[1, 2, schema='x:int,y:struct>') df.write.mode('overwrite').parquet('test') {code} {code:python} # This causes an error "Caused by: java.lang.RuntimeException: Couldn't find x#720 in [y#721]" spark.read.parquet('test').select(F.expr('y.a[x]')).show() # Explain works fine, note it doesn't read x in ReadSchema spark.read.parquet('test').select(F.expr('y.a[x]')).explain() == Physical Plan == *(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717] +- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], ReadSchema: struct>> {code} The code works well if I - manually select the column it misses `spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show()` - or use `F.element_at` function `spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show()` was: ``` df = spark.createDataFrame([[1, [[1, 2, schema='x:int,y:struct>') df.write.mode('overwrite').parquet('test') ``` ``` # This causes an error "Caused by: java.lang.RuntimeException: Couldn't find x#720 in [y#721]" spark.read.parquet('test').select(F.expr('y.a[x]')).show() # Explain works fine, note it doesn't read x in ReadSchema spark.read.parquet('test').select(F.expr('y.a[x]')).explain() == Physical Plan == *(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717] +- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], ReadSchema: struct>> ``` The code works well if I - manually select the column it misses `spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show()` - or use `F.element_at` function `spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show()` ``` > spark doesn't read data source column if it is needed as an index to an array > in a nested struct > > > Key: SPARK-33184 > URL: https://issues.apache.org/jira/browse/SPARK-33184 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: colin fang >Priority: Minor > > {code:python} > df = spark.createDataFrame([[1, [[1, 2, > schema='x:int,y:struct>') > df.write.mode('overwrite').parquet('test') > {code} > {code:python} > # This causes an error "Caused by: java.lang.RuntimeException: Couldn't find > x#720 in [y#721]" > spark.read.parquet('test').select(F.expr('y.a[x]')).show() > # Explain works fine, note it doesn't read x in ReadSchema > spark.read.parquet('test').select(F.expr('y.a[x]')).explain() > == Physical Plan == > *(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717] > +- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, > Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], > ReadSchema: struct>> > {code} > The code works well if I > - manually select the column it misses > `spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show()` > - or use `F.element_at` function > `spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + > 1)).show()` -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33184) spark doesn't read data source column if it is needed as an index to an array in a nested struct
colin fang created SPARK-33184: -- Summary: spark doesn't read data source column if it is needed as an index to an array in a nested struct Key: SPARK-33184 URL: https://issues.apache.org/jira/browse/SPARK-33184 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: colin fang ``` df = spark.createDataFrame([[1, [[1, 2, schema='x:int,y:struct>') df.write.mode('overwrite').parquet('test') ``` ``` # This causes an error "Caused by: java.lang.RuntimeException: Couldn't find x#720 in [y#721]" spark.read.parquet('test').select(F.expr('y.a[x]')).show() # Explain works fine, note it doesn't read x in ReadSchema spark.read.parquet('test').select(F.expr('y.a[x]')).explain() == Physical Plan == *(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717] +- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], ReadSchema: struct>> ``` The code works well if I - manually select the column it misses `spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show()` - or use `F.element_at` function `spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show()` ``` -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28148) repartition after join is not optimized away
colin fang created SPARK-28148: -- Summary: repartition after join is not optimized away Key: SPARK-28148 URL: https://issues.apache.org/jira/browse/SPARK-28148 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.3 Reporter: colin fang Partitioning & sorting is usually retained after join. {code} spark.conf.set('spark.sql.shuffle.partitions', '42') df1 = spark.range(500, numPartitions=5) df2 = spark.range(1000, numPartitions=5) df3 = spark.range(2000, numPartitions=5) # Reuse previous partitions & sort. df1.join(df2, on='id').join(df3, on='id').explain() # == Physical Plan == # *(8) Project [id#367L] # +- *(8) SortMergeJoin [id#367L], [id#374L], Inner #:- *(5) Project [id#367L] #: +- *(5) SortMergeJoin [id#367L], [id#369L], Inner #: :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0 #: : +- Exchange hashpartitioning(id#367L, 42) #: : +- *(1) Range (0, 500, step=1, splits=5) #: +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0 #:+- Exchange hashpartitioning(id#369L, 42) #: +- *(3) Range (0, 1000, step=1, splits=5) #+- *(7) Sort [id#374L ASC NULLS FIRST], false, 0 # +- Exchange hashpartitioning(id#374L, 42) # +- *(6) Range (0, 2000, step=1, splits=5) {code} However here: Partitions persist through left join, sort doesn't. {code} df1.join(df2, on='id', how='left').repartition('id').sortWithinPartitions('id').explain() # == Physical Plan == # *(5) Sort [id#367L ASC NULLS FIRST], false, 0 # +- *(5) Project [id#367L] #+- SortMergeJoin [id#367L], [id#369L], LeftOuter # :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0 # : +- Exchange hashpartitioning(id#367L, 42) # : +- *(1) Range (0, 500, step=1, splits=5) # +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0 # +- Exchange hashpartitioning(id#369L, 42) # +- *(3) Range (0, 1000, step=1, splits=5) {code} Also here: Partitions do not persist though inner join. {code} df1.join(df2, on='id').repartition('id').sortWithinPartitions('id').explain() # == Physical Plan == # *(6) Sort [id#367L ASC NULLS FIRST], false, 0 # +- Exchange hashpartitioning(id#367L, 42) #+- *(5) Project [id#367L] # +- *(5) SortMergeJoin [id#367L], [id#369L], Inner # :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0 # : +- Exchange hashpartitioning(id#367L, 42) # : +- *(1) Range (0, 500, step=1, splits=5) # +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0 # +- Exchange hashpartitioning(id#369L, 42) #+- *(3) Range (0, 1000, step=1, splits=5) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27759) Do not auto cast array to np.array in vectorized udf
[ https://issues.apache.org/jira/browse/SPARK-27759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] colin fang updated SPARK-27759: --- Description: {code:java} pd_df = pd.DataFrame({'x': np.random.rand(11, 3, 5).tolist()}) df = spark.createDataFrame(pd_df).cache() {code} Each element in x is a list of list, as expected. {code:java} df.toPandas()['x'] # 0 [[0.08669612955959993, 0.32624430522634495, 0 # 1 [[0.29838166086156914, 0.008550172904516762, 0... # 2 [[0.641304534802928, 0.2392047548381877, 0.555... {code} {code:java} def my_udf(x): # Hack to see what's inside a udf raise Exception(x.values.shape, x.values[0].shape, x.values[0][0].shape, np.stack(x.values).shape) return pd.Series(x.values) my_udf = F.pandas_udf(my_udf, returnType=DoubleType()) df.coalesce(1).withColumn('y', my_udf('x')).show( # Exception: ((11,), (3,), (5,), (11, 3)){code} A batch (11) of `x` is converted to pd.Series, however, each element in the pd.Series is now a numpy 1d array of numpy 1d array. It is inconvenient to work with nested 1d numpy array in practice in a udf. For example, I need a ndarray of shape (11, 3, 5) in udf, so that I can make use of the numpy vectorized operations. If I was given a list of list intact, I can simply do `np.stack(x.values)`. However, it doesn't work here as what I received is a nested numpy 1d array. was: {code:java} pd_df = pd.DataFrame(\{'x': np.random.rand(11, 3, 5).tolist()}) df = spark.createDataFrame(pd_df).cache() {code} Each element in x is a list of list, as expected. {code:java} df.toPandas()['x'] # 0 [[0.08669612955959993, 0.32624430522634495, 0 # 1 [[0.29838166086156914, 0.008550172904516762, 0... # 2 [[0.641304534802928, 0.2392047548381877, 0.555... {code} {code:java} def my_udf(x): # Hack to see what's inside a udf raise Exception(x.values.shape, x.values[0].shape, x.values[0][0].shape, np.stack(x.values).shape) return pd.Series(x.values) my_udf = pandas_udf(dot_product, returnType=DoubleType()) df.withColumn('y', my_udf('x')).show() Exception: ((2,), (3,), (5,), (2, 3)) {code} A batch (2) of `x` is converted to pd.Series, however, each element in the pd.Series is now a numpy 1d array of numpy 1d array. It is inconvenient to work with nested 1d numpy array in practice in a udf. For example, I need a ndarray of shape (2, 3, 5) in udf, so that I can make use of the numpy vectorized operations. If I was given a list of list intact, I can simply do `np.stack(x.values)`. However, it doesn't work here as what I received is a nested numpy 1d array. > Do not auto cast array to np.array in vectorized udf > > > Key: SPARK-27759 > URL: https://issues.apache.org/jira/browse/SPARK-27759 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: colin fang >Priority: Minor > > {code:java} > pd_df = pd.DataFrame({'x': np.random.rand(11, 3, 5).tolist()}) > df = spark.createDataFrame(pd_df).cache() > {code} > Each element in x is a list of list, as expected. > {code:java} > df.toPandas()['x'] > # 0 [[0.08669612955959993, 0.32624430522634495, 0 > # 1 [[0.29838166086156914, 0.008550172904516762, 0... > # 2 [[0.641304534802928, 0.2392047548381877, 0.555... > {code} > > {code:java} > def my_udf(x): > # Hack to see what's inside a udf > raise Exception(x.values.shape, x.values[0].shape, x.values[0][0].shape, > np.stack(x.values).shape) > return pd.Series(x.values) > my_udf = F.pandas_udf(my_udf, returnType=DoubleType()) > df.coalesce(1).withColumn('y', my_udf('x')).show( > # Exception: ((11,), (3,), (5,), (11, 3)){code} > > A batch (11) of `x` is converted to pd.Series, however, each element in the > pd.Series is now a numpy 1d array of numpy 1d array. It is inconvenient to > work with nested 1d numpy array in practice in a udf. > > For example, I need a ndarray of shape (11, 3, 5) in udf, so that I can make > use of the numpy vectorized operations. If I was given a list of list intact, > I can simply do `np.stack(x.values)`. However, it doesn't work here as what I > received is a nested numpy 1d array. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27759) Do not auto cast array to np.array in vectorized udf
colin fang created SPARK-27759: -- Summary: Do not auto cast array to np.array in vectorized udf Key: SPARK-27759 URL: https://issues.apache.org/jira/browse/SPARK-27759 Project: Spark Issue Type: Improvement Components: PySpark, SQL Affects Versions: 2.4.3 Reporter: colin fang {code:java} pd_df = pd.DataFrame(\{'x': np.random.rand(11, 3, 5).tolist()}) df = spark.createDataFrame(pd_df).cache() {code} Each element in x is a list of list, as expected. {code:java} df.toPandas()['x'] # 0 [[0.08669612955959993, 0.32624430522634495, 0 # 1 [[0.29838166086156914, 0.008550172904516762, 0... # 2 [[0.641304534802928, 0.2392047548381877, 0.555... {code} {code:java} def my_udf(x): # Hack to see what's inside a udf raise Exception(x.values.shape, x.values[0].shape, x.values[0][0].shape, np.stack(x.values).shape) return pd.Series(x.values) my_udf = pandas_udf(dot_product, returnType=DoubleType()) df.withColumn('y', my_udf('x')).show() Exception: ((2,), (3,), (5,), (2, 3)) {code} A batch (2) of `x` is converted to pd.Series, however, each element in the pd.Series is now a numpy 1d array of numpy 1d array. It is inconvenient to work with nested 1d numpy array in practice in a udf. For example, I need a ndarray of shape (2, 3, 5) in udf, so that I can make use of the numpy vectorized operations. If I was given a list of list intact, I can simply do `np.stack(x.values)`. However, it doesn't work here as what I received is a nested numpy 1d array. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17859) persist should not impede with spark's ability to perform a broadcast join.
[ https://issues.apache.org/jira/browse/SPARK-17859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830504#comment-16830504 ] colin fang commented on SPARK-17859: The above case works for me in v2.4 {code:java} spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0) df_large = spark.range(1e6) df_small = F.broadcast(spark.range(10).coalesce(1)).cache() df_large.join(df_small, "id").explain() == Physical Plan == *(2) Project [id#0L] +- *(2) BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight :- *(2) Range (0, 100, step=1, splits=4) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) +- *(1) InMemoryTableScan [id#2L] +- InMemoryRelation [id#2L], StorageLevel(disk, memory, deserialized, 1 replicas) +- Coalesce 1 +- *(1) Range (0, 10, step=1, splits=4) {code} However, I have definitely seen cases where `F.broadcast` is ignored for cached dataframe. (I am unable to find a minimal example though.) > persist should not impede with spark's ability to perform a broadcast join. > --- > > Key: SPARK-17859 > URL: https://issues.apache.org/jira/browse/SPARK-17859 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.0.0 > Environment: spark 2.0.0 , Linux RedHat >Reporter: Franck Tago >Priority: Major > > I am using Spark 2.0.0 > My investigation leads me to conclude that calling persist could prevent > broadcast join from happening . > Example > Case1: No persist call > var df1 =spark.range(100).select($"id".as("id1")) > df1: org.apache.spark.sql.DataFrame = [id1: bigint] > var df2 =spark.range(1000).select($"id".as("id2")) > df2: org.apache.spark.sql.DataFrame = [id2: bigint] > df1.join(df2 , $"id1" === $"id2" ).explain > == Physical Plan == > *BroadcastHashJoin [id1#117L], [id2#123L], Inner, BuildRight > :- *Project [id#114L AS id1#117L] > : +- *Range (0, 100, splits=2) > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > false])) >+- *Project [id#120L AS id2#123L] > +- *Range (0, 1000, splits=2) > Case 2: persist call > df1.persist.join(df2 , $"id1" === $"id2" ).explain > 16/10/10 15:50:21 WARN CacheManager: Asked to cache already cached data. > == Physical Plan == > *SortMergeJoin [id1#3L], [id2#9L], Inner > :- *Sort [id1#3L ASC], false, 0 > : +- Exchange hashpartitioning(id1#3L, 10) > : +- InMemoryTableScan [id1#3L] > :: +- InMemoryRelation [id1#3L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > :: : +- *Project [id#0L AS id1#3L] > :: : +- *Range (0, 100, splits=2) > +- *Sort [id2#9L ASC], false, 0 >+- Exchange hashpartitioning(id2#9L, 10) > +- InMemoryTableScan [id2#9L] > : +- InMemoryRelation [id2#9L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > : : +- *Project [id#6L AS id2#9L] > : : +- *Range (0, 1000, splits=2) > Why does the persist call prevent the broadcast join . > My opinion is that it should not . > I was made aware that the persist call is lazy and that might have something > to do with it , but I still contend that it should not . > Losing broadcast joins is really costly. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27559) Nullable in a given schema is not respected when reading from parquet
colin fang created SPARK-27559: -- Summary: Nullable in a given schema is not respected when reading from parquet Key: SPARK-27559 URL: https://issues.apache.org/jira/browse/SPARK-27559 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.2 Reporter: colin fang Even if I specify a schema when reading from parquet, nullable is not reset. {code:java} spark.range(10, numPartitions=1).write.mode('overwrite').parquet('tmp') df1 = spark.read.parquet('tmp') df1.printSchema() # root # |-- id: long (nullable = true) df2 = spark.read.schema(StructType([StructField('id', LongType(), False)])).parquet('tmp') df2.printSchema() # root # |-- x: long (nullable = true) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27217) Nested schema pruning doesn't work for aggregation e.g. `sum`.
colin fang created SPARK-27217: -- Summary: Nested schema pruning doesn't work for aggregation e.g. `sum`. Key: SPARK-27217 URL: https://issues.apache.org/jira/browse/SPARK-27217 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: colin fang Since SPARK-4502 is fixed, I would expect queries such as `select sum(b.x)` doesn't have to read other nested fields. {code:python} rdd = spark.range(1000).rdd.map(lambda x: [x.id+3, [x.id+1, x.id-1]]) df = spark.createDataFrame(, schema='a:int,b:struct') df.repartition(1).write.mode('overwrite').parquet('test.parquet') df = spark.read.parquet('test.parquet') spark.conf.set('spark.sql.optimizer.nestedSchemaPruning.enabled', 'true') df.select('b.x').explain() # ReadSchema: struct> spark.conf.set('spark.sql.optimizer.nestedSchemaPruning.enabled', 'false') df.select('b.x').explain() # ReadSchema: struct> spark.conf.set('spark.sql.optimizer.nestedSchemaPruning.enabled', 'true') df.selectExpr('sum(b.x)').explain() # ReadSchema: struct> {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org