Re: Problems with update function in koalas - pyspark pandas.

2021-09-12 Thread Bjørn Jørgensen



https://issues.apache.org/jira/browse/SPARK-36722

https://github.com/apache/spark/pull/33968

On 2021/09/11 10:06:50, Bj��rn J��rgensen  wrote: 
> Hi I am using "from pyspark import pandas as ps" in a master build yesterday. 
> I do have some columns that I need to join to one. 
> In pandas I use update.
> 
>  
> 54   FD_OBJECT_SUPPLIES_SERVICES_OBJECT_SUPPLY_SERVICE_ADDITIONAL_INFORMATION 
>   
>   23 non-null  object 
> 55   
> FD_OBJECT_SUPPLIES_SERVICES_OBJECT_SUPPLY_SERVICE_ADDITIONAL_INFORMATION.P
>   
>  24348 non-null   object
>  
>  
>  
> pd1['FD_OBJECT_SUPPLIES_SERVICES_OBJECT_SUPPLY_SERVICE_ADDITIONAL_INFORMATION'].update(pd1['FD_OBJECT_SUPPLIES_SERVICES_OBJECT_SUPPLY_SERVICE_ADDITIONAL_INFORMATION.P'])
>  
>  ---
> AssertionErrorTraceback (most recent call last)
> /tmp/ipykernel_73/391781247.py in 
> > 1 
> pd1['FD_OBJECT_SUPPLIES_SERVICES_OBJECT_SUPPLY_SERVICE_ADDITIONAL_INFORMATION'].update(pd1['FD_OBJECT_SUPPLIES_SERVICES_OBJECT_SUPPLY_SERVICE_ADDITIONAL_INFORMATION.P'])
> 
> /opt/spark/python/pyspark/pandas/series.py in update(self, other)
>4549 raise TypeError("'other' must be a Series")
>4550 
> -> 4551 combined = combine_frames(self._psdf, other._psdf, 
> how="leftouter")
>4552 
>4553 this_scol = 
> combined["this"]._internal.spark_column_for(self._column_label)
> 
> /opt/spark/python/pyspark/pandas/utils.py in combine_frames(this, how, 
> preserve_order_column, *args)
> 139 elif len(args) == 1 and isinstance(args[0], DataFrame):
> 140 assert isinstance(args[0], DataFrame)
> --> 141 assert not same_anchor(
> 142 this, args[0]
> 143 ), "We don't need to combine. `this` and `that` are same."
> 
> AssertionError: We don't need to combine. `this` and `that` are same.
> 
> 
> pd1.info()
> 
> 54   FD_OBJECT_SUPPLIES_SERVICES_OBJECT_SUPPLY_SERVICE_ADDITIONAL_INFORMATION 
>   
>   23 non-null  object 
> 55   
> FD_OBJECT_SUPPLIES_SERVICES_OBJECT_SUPPLY_SERVICE_ADDITIONAL_INFORMATION.P
>   
>  24348 non-null   object
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Structured Streaming Continuous Trigger on multiple sinks

2021-09-12 Thread Alex Ott
Just don't call .awaitTermindation() because it blocks execution of the
next line of code. You can assign result of .start() to a specific
variable, or put them into list/array.

And to wait until one of the streams finishes, use
spark.streams.awaitAnyTermination() or something like this
(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries)
 

S  at "Wed, 25 Aug 2021 14:14:48 +0530" wrote:
 S> Hello,

 S> I have a structured streaming job that needs to be able to write to 
multiple sinks. We are using Continuous Trigger and not Microbatch Trigger. 

 S> 1. When we use the foreach method using:
 S> dataset1.writeStream.foreach(kafka ForEachWriter 
logic).trigger(ContinuousMode).start().awaitTermination() 
 S> dataset1.writeStream.foreach(mongo ForEachWriter 
logic).trigger(ContinuousMode).start().awaitTermination() 
 S> The first statement blocks the second one for obvious reasons. So this does 
not serve our purpose.
 S> 2. The next step for this problem would be to use the foreachbatch. That is 
not supported in the ContinuousMode.
 S> 3. The next step was to use something like this 
 S> 
dataset1.writeStream.format("kafka").trigger(ContinuousMode).start().awaitTermination()
 
 S> 
dataset1.writeStream.format("mongo").trigger(ContinuousMode).start().awaitTermination()
 S> for both the sinks. This does not work either. Only the 1st query works. 
The second one does not.

 S> Is there any solution to the problem of being able to write to multiple 
sinks in Continuous Trigger Mode using Structured Streaming?



-- 
With best wishes,Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org