Filtering based on a float value with more than one decimal place not working correctly in Pyspark dataframe

2018-09-25 Thread Meethu Mathew
Hi all,

I tried the following code and the output was not as expected.

schema = StructType([StructField('Id', StringType(), False),
>  StructField('Value', FloatType(), False)])
> df_test = spark.createDataFrame([('a',5.0),('b',1.236),('c',-0.31)],schema)

df_test


Output :  DataFrame[Id: string, Value: float]
[image: image.png]
But when the value is given as a string, it worked.

[image: image.png]
Again tried with a floating point number with one decimal place and it
worked.
[image: image.png]
And when the equals operation is changed to greater than or less than, its
working with more than one decimal place numbers
[image: image.png]
Is this a bug?

Regards,
Meethu Mathew


RE: Python kubernetes spark 2.4 branch

2018-09-25 Thread Garlapati, Suryanarayana (Nokia - IN/Bangalore)
Hi Ilan/ Yinan,
Yes my test case is also similar to the one described in 
https://issues.apache.org/jira/browse/SPARK-24736

My spark-submit is as follows:
./spark-submit --deploy-mode cluster --master 
k8s://https://10.75.145.23:8443 --conf 
spark.app.name=spark-py --properties-file /tmp/program_files/spark_py.conf 
--py-files http://10.75.145.25:80/Spark/getNN.py 
http://10.75.145.25:80/Spark/test.py

Following is the error observed:

+ exec /sbin/tini -s – /opt/spark/bin/spark-submit --conf 
spark.driver.bindAddress=192.168.1.22 --deploy-mode client --properties-file 
/opt/spark/conf/spark.properties --class org.apache.spark.deploy.PythonRunner 
http://10.75.145.25:80/Spark/test.py
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/opt/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/opt/spark/jars/phoenix-4.13.1-HBase-1.3-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Traceback (most recent call last):
File "/tmp/spark-4c428c98-e123-4c29-a9f5-ef85f207e229/test.py", line 13, in 

from getNN import *
ImportError: No module named getNN
2018-09-25 16:19:57 INFO ShutdownHookManager:54 - Shutdown hook called
2018-09-25 16:19:57 INFO ShutdownHookManager:54 - Deleting directory 
/tmp/spark-4c428c98-e123-4c29-a9f5-ef85f207e229

Observing the same kind of behaviour as mentioned in 
https://issues.apache.org/jira/browse/SPARK-24736 (file getting downloaded and 
available in pod)

This is also the same with the local files as well:

./spark-submit --deploy-mode cluster --master 
k8s://https://10.75.145.23:8443 --conf 
spark.app.name=spark-py --properties-file /tmp/program_files/spark_py.conf 
--py-files ./getNN.py http://10.75.145.25:80/Spark/test.py

test.py has dependencies from getNN.py.


But the same is working in spark 2.2 k8s branch.


Regards
Surya

From: Ilan Filonenko 
Sent: Wednesday, September 26, 2018 2:06 AM
To: liyinan...@gmail.com
Cc: Garlapati, Suryanarayana (Nokia - IN/Bangalore) 
; Spark dev list ; 
u...@spark.apache.org
Subject: Re: Python kubernetes spark 2.4 branch

Is this in reference to: https://issues.apache.org/jira/browse/SPARK-24736 ?

On Tue, Sep 25, 2018 at 12:38 PM Yinan Li 
mailto:liyinan...@gmail.com>> wrote:
Can you give more details on how you ran your app, did you build your own 
image, and which image are you using?

On Tue, Sep 25, 2018 at 10:23 AM Garlapati, Suryanarayana (Nokia - 
IN/Bangalore) 
mailto:suryanarayana.garlap...@nokia.com>> 
wrote:
Hi,
I am trying to run spark python testcases on k8s based on tag spark-2.4-rc1. 
When the dependent files are passed through the --py-files option, they are not 
getting resolved by the main python script. Please let me know, is this a known 
issue?

Regards
Surya



Re: [DISCUSS] Cascades style CBO for Spark SQL

2018-09-25 Thread Xiao Li
Hi, Xiaoju,

Thanks for sending this to the dev list. The current join reordering rule
is just a stats based optimizer rule. Either top-down or bottom-up
optimization can achieve the same-level optimized plans. DB2 is using
bottom up. In the future, we plan to move the stats based join reordering
rule to the cost-based planner, which is the right place of this rule based
on the original design of Spark SQL.

Actually, building a good cost model is much more difficult than
implementing such a classic framework, especially when Spark does not own
the data. Also, we need to compute incremental stats instead of always
recomputing the stats.

Cheers,

Xiao



吴晓菊  于2018年9月24日周一 下午7:53写道:

> Hi All,
>
> Current Spark CBO implements a cost based multi-way join reordering
> algorithm based on the System-R’s paper [Access Path-SIGMOD’79]
> .
> When building m-way joins, it uses a bottom-up approach and put all items
> (basic joined nodes) into level 0, then build all two-way joins at level 1
> from plans at level 0 (single items), then build all 3-way joins ... etc.
> The algorithm also considers all combinations including left-deep trees,
> bushy trees, and right-deep-trees. It also prunes cartesian product
> candidates.
>
> While we still found many *limitations* of current CBO implementation:
> 1. The current CBO is a rule in logic phase, it only outputs one logical
> plan to physical phase optimize, while we cannot make sure the best plan in
> logical phase is still the best after physical optimize.
>
> 2. In current bottom-up approach, we keeps only one best plan for each
> level, while we cannot make sure to get the exact best plan for all from
> the best plan for each level.
>
> 3. Current cost formula cost = weight * cardinality + (1.0 - weight) *
> size from which the first portion roughly corresponds to the CPU cost and
> the second portion roughly corresponds to the I/O cost. The cost formula
> is over simplified and. It treats all the join implementations the same
> way and doesn't take shuffle and sort cost into consideration, while Shuffle
> Exchange is one of the heaviest physical operator in Spark SQL.
>
> 4. Equivalent join conditions are not supported. For example, (A join B
> join C on a=b and b=c) can be reordered to (A join C join B on a=c and c=b)
> which is possible to be more efficient. While in current implementation, we
> will not get condition "a=c" so will take "A join C" like a Cartesian
> Product and then exclude it.
>
> The bottom-up approach first came up from the System-R optimizer (1979). It
> quickly became a standard and many of the modern relation database
> optimizers are “System-R style”, for example, Oracle, PostgreSQL, MySQL,
> DB2.
>
> As time goes by, new styles optimizer were invented: Volcano(1993) and
> Cascades(1995). They are not that famous compared to System-R but still be
> wildly used in practice: Microsoft SQL Server, Greenplum Orca, Apache
> Calcite. They implement Top-down transformational search algorithm and
> provide extensible optimization framework.
>
> A top-down optimization framework can help us solve the above limitations
> since it has a more complete search space and combines the logical and
> physical phases to have a more accurate cost estimation. And about the
> efficiency of having all alternatives plans, Cascades also provides pruning
> to save the search space.
>
> What about implementing *a new Cascades style CBO for Spark SQL*?
> It could be a new rule in current "Planner" which reads a logical plan
> after heuristics rules and outputs a best physical plan with least cost
> after reorder and physical implementation rules.
>
> Xiaoju Wu
> Phone:+86 17717640807
>
>


[Discuss] Language Interop for Apache Spark

2018-09-25 Thread tcondie
There seems to be some desire for third party language extensions for Apache
Spark. Some notable examples include:

*   C#/F# from project Mobius https://github.com/Microsoft/Mobius
*   Haskell from project sparkle https://github.com/tweag/sparkle
*   Julia from project Spark.jl https://github.com/dfdx/Spark.jl

 

Presently, Apache Spark supports Python and R via a tightly integrated
interop layer. It would seem that much of that existing interop layer could
be refactored into a clean surface for general (third party) language
bindings, such as the above mentioned. More specifically, could we
generalize the following modules:

1.  Deploy runners (e.g., PythonRunner and RRunner) 
2.  DataFrame Executors
3.  RDD operations? 

 

The last being questionable: integrating third party language extensions at
the RDD level may be too heavy-weight and unnecessary given the preference
towards the DataFrame abstraction. 

 

The main goals of this effort would be:

1.  Provide a clean abstraction for third party language extensions
making it easier to maintain (the language extension) with the evolution of
Apache Spark
2.  Provide guidance to third party language authors on how a language
extension should be implemented
3.  Provide general reusable libraries that are not specific to any
language extension
4.  Open the door to developers that prefer alternative languages

 

-Tyson Condie 



Re: Python kubernetes spark 2.4 branch

2018-09-25 Thread Ilan Filonenko
Is this in reference to: https://issues.apache.org/jira/browse/SPARK-24736
?

On Tue, Sep 25, 2018 at 12:38 PM Yinan Li  wrote:

> Can you give more details on how you ran your app, did you build your own
> image, and which image are you using?
>
> On Tue, Sep 25, 2018 at 10:23 AM Garlapati, Suryanarayana (Nokia -
> IN/Bangalore)  wrote:
>
>> Hi,
>>
>> I am trying to run spark python testcases on k8s based on tag
>> spark-2.4-rc1. When the dependent files are passed through the --py-files
>> option, they are not getting resolved by the main python script. Please let
>> me know, is this a known issue?
>>
>>
>>
>> Regards
>>
>> Surya
>>
>>
>>
>


Accumulator issues in PySpark

2018-09-25 Thread Abdeali Kothari
I was trying to check out accumulators and see if I could use them for
anything.
I made a demo program and could not figure out how to add them up.

I found that I need to do a shuffle between all my python UDFs that I am
running for the accumulators to be run. Basically, if I do 5 withColumn()
with Python UDFs, I find the accumulator's value gets added only for the
last UDF I run before my action.

Here is a snippet to reproduce with Spark 2.3.2:

import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
from pyspark.sql import types as T

from pyspark import AccumulatorParam

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
test_accum = spark.sparkContext.accumulator(0.0)

SHUFFLE = False

def main(data):
print(">>> Check0", test_accum.value)
def test(x):
global test_accum
test_accum += 1.0
return x

print(">>> Check1", test_accum.value)

def test2(x):
global test_accum
test_accum += 100.0
return x

print(">>> Check2", test_accum.value)
func_udf = F.udf(test, T.DoubleType())
print(">>> Check3", test_accum.value)
func_udf2 = F.udf(test2, T.DoubleType())
print(">>> Check4", test_accum.value)

data = data.withColumn("out1", func_udf(data["a"]))
if SHUFFLE:
data = data.repartition(2)
print(">>> Check5", test_accum.value)
data = data.withColumn("out2", func_udf2(data["b"]))
if SHUFFLE:
data = data.repartition(2)
print(">>> Check6", test_accum.value)

data.show()  # ACTION
print(">>> Check7", test_accum.value)
return data


df = spark.createDataFrame([
[1.0, 2.0]
], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for
field_name in ["a", "b"]]))

df2 = main(df)



 Output 1 - with SHUFFLE=False
...
*# >>> Check7 100.0*


 Output 2 - with SHUFFLE=True
...
*# >>> Check7 101.0*

Basically looks like:
 - Accumulator works only for last UDF before a shuffle-like operation
Not sure if this is a bug or expected behaviour.

Overall goal:
I'm trying to capture error messages from UDFs if they error out. The plan
is to try/except them and catch an error, save to accumulator and continue
execution with `return None`.


Re: [Discuss] Datasource v2 support for Kerberos

2018-09-25 Thread Ryan Blue
I agree with Wenchen that we'd remove the prefix when passing to a source,
so you could use the same "spark.yarn.keytab" option in both places. But I
think the problem is that "spark.yarn.keytab" still needs to be set, and it
clearly isn't in a shared namespace for catalog options. So I think we
would still need a solution for existing options. I'm more comfortable with
a white list for existing options that we want to maintain compatibility
with.

rb



On Mon, Sep 24, 2018 at 11:52 PM tigerquoll  wrote:

> To give some Kerberos specific examples, The spark-submit args:
> -–conf spark.yarn.keytab=path_to_keytab -–conf
> spark.yarn.principal=princi...@realm.com
>
> are currently not passed through to the data sources.
>
>
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: Python kubernetes spark 2.4 branch

2018-09-25 Thread Yinan Li
Can you give more details on how you ran your app, did you build your own
image, and which image are you using?

On Tue, Sep 25, 2018 at 10:23 AM Garlapati, Suryanarayana (Nokia -
IN/Bangalore)  wrote:

> Hi,
>
> I am trying to run spark python testcases on k8s based on tag
> spark-2.4-rc1. When the dependent files are passed through the --py-files
> option, they are not getting resolved by the main python script. Please let
> me know, is this a known issue?
>
>
>
> Regards
>
> Surya
>
>
>


Re: Support for Second level of concurrency

2018-09-25 Thread Sandeep Mahendru
Hey Jorn,

  Appreciate the prompt reply.

Yeah that would surely work, we have tried a similar approach. The only
concern here is that to make the solution low latency, we want to avoid
routing through a message broker.

Regards,
Sandeep.

On Tue, Sep 25, 2018 at 12:53 PM Jörn Franke  wrote:

> What is the ultimate goal of this algorithm?  There could be already
> algorithms that can do this within Spark. You could also put a message on
> Kafka (or another broker) and have spark applications listen to them to
> trigger further computation. This would be also more controlled and can be
> done already now.
>
> > On 25. Sep 2018, at 17:31, sandeep mehandru 
> wrote:
> >
> > Hi Folks,
> >
> >   There is a use-case , where we are doing large computation on two large
> > vectors. It is basically a scenario, where we run a flatmap operation on
> the
> > Left vector and run co-relation logic by comparing it with all the rows
> of
> > the second vector. When this flatmap operation is running on an executor,
> > this compares row 1 from left vector with all rows of the second vector.
> The
> > goal is that from this flatmap operation, we want to start another remote
> > map operation that compares a portion of right vector rows. This enables
> a
> > second level of concurrent operation, thereby increasing throughput and
> > utilizing other nodes. But to achieve this we need access to spark
> context
> > from within the Flatmap operation.
> >
> > I have attached a snapshot describing the limitation.
> >
> > <
> http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3134/Concurrency_Snapshot.jpg>
>
> >
> > In simple words, this boils down to having access to  a spark context
> from
> > within an executor , so that the next level of map or concurrent
> operations
> > can be spun on the partitions on other machines. I have some experience
> with
> > other in-memory compute grids technologies like Coherence, Hazelcast.
> This
> > frameworks do allow to trigger next level of concurrent operations from
> > within a task being executed on one node.
> >
> >
> > Regards,
> > Sandeep.
> >
> >
> >
> > --
> > Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
> >
> > -
> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >
>


Python kubernetes spark 2.4 branch

2018-09-25 Thread Garlapati, Suryanarayana (Nokia - IN/Bangalore)
Hi,
I am trying to run spark python testcases on k8s based on tag spark-2.4-rc1. 
When the dependent files are passed through the --py-files option, they are not 
getting resolved by the main python script. Please let me know, is this a known 
issue?

Regards
Surya



Re: Support for Second level of concurrency

2018-09-25 Thread Jörn Franke
What is the ultimate goal of this algorithm?  There could be already algorithms 
that can do this within Spark. You could also put a message on Kafka (or 
another broker) and have spark applications listen to them to trigger further 
computation. This would be also more controlled and can be done already now.

> On 25. Sep 2018, at 17:31, sandeep mehandru  
> wrote:
> 
> Hi Folks,
> 
>   There is a use-case , where we are doing large computation on two large
> vectors. It is basically a scenario, where we run a flatmap operation on the
> Left vector and run co-relation logic by comparing it with all the rows of
> the second vector. When this flatmap operation is running on an executor,
> this compares row 1 from left vector with all rows of the second vector. The
> goal is that from this flatmap operation, we want to start another remote
> map operation that compares a portion of right vector rows. This enables a
> second level of concurrent operation, thereby increasing throughput and
> utilizing other nodes. But to achieve this we need access to spark context
> from within the Flatmap operation.
> 
> I have attached a snapshot describing the limitation.
> 
> 
>  
> 
> In simple words, this boils down to having access to  a spark context from
> within an executor , so that the next level of map or concurrent operations
> can be spun on the partitions on other machines. I have some experience with
> other in-memory compute grids technologies like Coherence, Hazelcast. This
> frameworks do allow to trigger next level of concurrent operations from
> within a task being executed on one node.
> 
> 
> Regards,
> Sandeep.
> 
> 
> 
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Support for Second level of concurrency

2018-09-25 Thread Reynold Xin
That’s a pretty major architectural change and would be extremely difficult
to do at this stage.

On Tue, Sep 25, 2018 at 9:31 AM sandeep mehandru 
wrote:

> Hi Folks,
>
>There is a use-case , where we are doing large computation on two large
> vectors. It is basically a scenario, where we run a flatmap operation on
> the
> Left vector and run co-relation logic by comparing it with all the rows of
> the second vector. When this flatmap operation is running on an executor,
> this compares row 1 from left vector with all rows of the second vector.
> The
> goal is that from this flatmap operation, we want to start another remote
> map operation that compares a portion of right vector rows. This enables a
> second level of concurrent operation, thereby increasing throughput and
> utilizing other nodes. But to achieve this we need access to spark context
> from within the Flatmap operation.
>
> I have attached a snapshot describing the limitation.
>
> <
> http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3134/Concurrency_Snapshot.jpg>
>
>
> In simple words, this boils down to having access to  a spark context from
> within an executor , so that the next level of map or concurrent operations
> can be spun on the partitions on other machines. I have some experience
> with
> other in-memory compute grids technologies like Coherence, Hazelcast. This
> frameworks do allow to trigger next level of concurrent operations from
> within a task being executed on one node.
>
>
> Regards,
> Sandeep.
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
> --
--
excuse the brevity and lower case due to wrist injury


Support for Second level of concurrency

2018-09-25 Thread sandeep mehandru
Hi Folks,

   There is a use-case , where we are doing large computation on two large
vectors. It is basically a scenario, where we run a flatmap operation on the
Left vector and run co-relation logic by comparing it with all the rows of
the second vector. When this flatmap operation is running on an executor,
this compares row 1 from left vector with all rows of the second vector. The
goal is that from this flatmap operation, we want to start another remote
map operation that compares a portion of right vector rows. This enables a
second level of concurrent operation, thereby increasing throughput and
utilizing other nodes. But to achieve this we need access to spark context
from within the Flatmap operation.

I have attached a snapshot describing the limitation.


 

In simple words, this boils down to having access to  a spark context from
within an executor , so that the next level of map or concurrent operations
can be spun on the partitions on other machines. I have some experience with
other in-memory compute grids technologies like Coherence, Hazelcast. This
frameworks do allow to trigger next level of concurrent operations from
within a task being executed on one node.


Regards,
Sandeep.



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [Discuss] Datasource v2 support for Kerberos

2018-09-25 Thread tigerquoll
To give some Kerberos specific examples, The spark-submit args:
-–conf spark.yarn.keytab=path_to_keytab -–conf
spark.yarn.principal=princi...@realm.com

are currently not passed through to the data sources.





--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org