Re: [Spark Structured Streaming] Do spark structured streaming is support sink to AWS Kinesis currently?

2023-02-16 Thread Vikas Kumar
Doesn't directly answer your question but there are ways in scala and
pyspark - See if this helps:
https://repost.aws/questions/QUP_OJomilTO6oIgvK00VHEA/writing-data-to-kinesis-stream-from-py-spark

On Thu, Feb 16, 2023, 8:27 PM hueiyuan su  wrote:

> *Component*: Spark Structured Streaming
> *Level*: Advanced
> *Scenario*: How-to
>
> 
> *Problems Description*
> I would like to implement witeStream data to AWS Kinesis with Spark
> structured Streaming, but I do not find related connector jar can be used.
> I want to check whether fully support write stream to AWS Kinesis. If you
> have any ideas, please let me know. I will be appreciate it for your answer.
>
> --
> Best Regards,
>
> Mars Su
> *Phone*: 0988-661-013
> *Email*: hueiyua...@gmail.com
>


[Spark Structured Streaming] Do spark structured streaming is support sink to AWS Kinesis currently?

2023-02-16 Thread hueiyuan su
*Component*: Spark Structured Streaming
*Level*: Advanced
*Scenario*: How-to


*Problems Description*
I would like to implement witeStream data to AWS Kinesis with Spark
structured Streaming, but I do not find related connector jar can be used.
I want to check whether fully support write stream to AWS Kinesis. If you
have any ideas, please let me know. I will be appreciate it for your answer.

-- 
Best Regards,

Mars Su
*Phone*: 0988-661-013
*Email*: hueiyua...@gmail.com


Re: How to explode array columns of a dataframe having the same length

2023-02-16 Thread Vikas Kumar
I think these 4 steps should help:

Use zip
Explode
Withcolumn (getelement of array)
Drop the array column

Thanks



On Thu, Feb 16, 2023, 2:18 PM sam smith  wrote:

> @Enrico Minack  I used arrays_zip to merge values
> into one row, and then used toJSON() to export the data.
> @Bjørn explode_outer didn't yield the expected results.
>
> Thanks anyway.
>
> Le jeu. 16 févr. 2023 à 09:06, Enrico Minack  a
> écrit :
>
>> You have to take each row and zip the lists, each element of the result
>> becomes one new row.
>>
>> So turn write a method that turns
>>   Row(List("A","B","null"), List("C","D","null"), List("E","null","null"))
>> into
>>   List(List("A","C","E"), List("B","D","null"),
>> List("null","null","null"))
>> and use flatmap with that method.
>>
>> In Scala, this would read:
>>
>> df.flatMap { row => (row.getSeq[String](0), row.getSeq[String](1),
>> row.getSeq[String](2)).zipped.toIterable }.show()
>>
>> Enrico
>>
>>
>> Am 14.02.23 um 22:54 schrieb sam smith:
>>
>> Hello guys,
>>
>> I have the following dataframe:
>>
>> *col1*
>>
>> *col2*
>>
>> *col3*
>>
>> ["A","B","null"]
>>
>> ["C","D","null"]
>>
>> ["E","null","null"]
>>
>>
>> I want to explode it to the following dataframe:
>>
>> *col1*
>>
>> *col2*
>>
>> *col3*
>>
>> "A"
>>
>> "C"
>>
>> "E"
>>
>> "B"
>>
>> "D"
>>
>> "null"
>>
>> "null"
>>
>> "null"
>>
>> "null"
>>
>> How to do that (preferably in Java) using the explode() method ? knowing
>> that something like the following won't yield correct output:
>>
>> for (String colName: dataset.columns())
>> dataset=dataset.withColumn(colName,explode(dataset.col(colName)));
>>
>>
>>
>>


Re: How to explode array columns of a dataframe having the same length

2023-02-16 Thread sam smith
@Enrico Minack  I used arrays_zip to merge values
into one row, and then used toJSON() to export the data.
@Bjørn explode_outer didn't yield the expected results.

Thanks anyway.

Le jeu. 16 févr. 2023 à 09:06, Enrico Minack  a
écrit :

> You have to take each row and zip the lists, each element of the result
> becomes one new row.
>
> So turn write a method that turns
>   Row(List("A","B","null"), List("C","D","null"), List("E","null","null"))
> into
>   List(List("A","C","E"), List("B","D","null"), List("null","null","null"))
> and use flatmap with that method.
>
> In Scala, this would read:
>
> df.flatMap { row => (row.getSeq[String](0), row.getSeq[String](1),
> row.getSeq[String](2)).zipped.toIterable }.show()
>
> Enrico
>
>
> Am 14.02.23 um 22:54 schrieb sam smith:
>
> Hello guys,
>
> I have the following dataframe:
>
> *col1*
>
> *col2*
>
> *col3*
>
> ["A","B","null"]
>
> ["C","D","null"]
>
> ["E","null","null"]
>
>
> I want to explode it to the following dataframe:
>
> *col1*
>
> *col2*
>
> *col3*
>
> "A"
>
> "C"
>
> "E"
>
> "B"
>
> "D"
>
> "null"
>
> "null"
>
> "null"
>
> "null"
>
> How to do that (preferably in Java) using the explode() method ? knowing
> that something like the following won't yield correct output:
>
> for (String colName: dataset.columns())
> dataset=dataset.withColumn(colName,explode(dataset.col(colName)));
>
>
>
>


Re: How to explode array columns of a dataframe having the same length

2023-02-16 Thread Bjørn Jørgensen
Use explode_outer() when rows have null values.

tor. 16. feb. 2023 kl. 16:48 skrev Navneet :

> I am not expert, may be try if this works:
> In order to achieve the desired output using the explode() method in
> Java, you can create a User-Defined Function (UDF) that zips the lists
> in each row and returns the resulting list. Here's an example
> implementation:
>
> typescript
> Copy code
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.api.java.UDF1;
> import org.apache.spark.sql.types.DataTypes;
>
> public class ZipRows implements UDF1 {
> @Override
> public Row call(Row row) {
> List list1 = row.getList(0);
> List list2 = row.getList(1);
> List list3 = row.getList(2);
> List> zipped = new ArrayList<>();
> for (int i = 0; i < list1.size(); i++) {
> List sublist = new ArrayList<>();
> sublist.add(list1.get(i));
> sublist.add(list2.get(i));
> sublist.add(list3.get(i));
> zipped.add(sublist);
> }
> return RowFactory.create(zipped);
> }
> }
> This UDF takes a Row as input, which contains the three lists in each
> row of the original DataFrame. It then zips these lists using a loop
> that creates a new sublist for each element in the lists. Finally, it
> returns a new Row that contains the zipped list.
>
> You can then use this UDF in combination with explode() to achieve the
> desired output:
>
> javascript
> Copy code
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import static org.apache.spark.sql.functions.*;
>
> // assuming you have a Dataset called "df"
> df.withColumn("zipped", callUDF(new ZipRows(),
>
> DataTypes.createArrayType(DataTypes.createArrayType(DataTypes.StringType))),
> "col1", "col2", "col3")
> .selectExpr("explode(zipped) as zipped")
> .selectExpr("zipped[0] as col1", "zipped[1] as col2", "zipped[2] as col3")
> .show();
> This code first adds a new column called "zipped" to the DataFrame
> using the callUDF() function, which applies the ZipRows UDF to the
> "col1", "col2", and "col3" columns. It then uses explode() to explode
> the "zipped" column, and finally selects the three sub-elements of the
> zipped list as separate columns using selectExpr(). The output should
> be the desired DataFrame.
>
>
>
> Regards,
> Navneet Kr
>
>
> On Thu, 16 Feb 2023 at 00:07, Enrico Minack 
> wrote:
> >
> > You have to take each row and zip the lists, each element of the result
> becomes one new row.
> >
> > So turn write a method that turns
> >   Row(List("A","B","null"), List("C","D","null"),
> List("E","null","null"))
> > into
> >   List(List("A","C","E"), List("B","D","null"),
> List("null","null","null"))
> > and use flatmap with that method.
> >
> > In Scala, this would read:
> >
> > df.flatMap { row => (row.getSeq[String](0), row.getSeq[String](1),
> row.getSeq[String](2)).zipped.toIterable }.show()
> >
> > Enrico
> >
> >
> > Am 14.02.23 um 22:54 schrieb sam smith:
> >
> > Hello guys,
> >
> > I have the following dataframe:
> >
> > col1
> >
> > col2
> >
> > col3
> >
> > ["A","B","null"]
> >
> > ["C","D","null"]
> >
> > ["E","null","null"]
> >
> >
> >
> > I want to explode it to the following dataframe:
> >
> > col1
> >
> > col2
> >
> > col3
> >
> > "A"
> >
> > "C"
> >
> > "E"
> >
> > "B"
> >
> > "D"
> >
> > "null"
> >
> > "null"
> >
> > "null"
> >
> > "null"
> >
> >
> > How to do that (preferably in Java) using the explode() method ? knowing
> that something like the following won't yield correct output:
> >
> > for (String colName: dataset.columns())
> > dataset=dataset.withColumn(colName,explode(dataset.col(colName)));
> >
> >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


How can I set a value of Location with CustomDataSource ?

2023-02-16 Thread Zhuolin Ji
Hi all,

I am sorry to bother you, I have a problem and I hope to get your help. I want 
to use spark to customize the data source based on ParquetDataSourceV2 class in 
spark v3.2.2, but I want to leave out the location field and then modify the 
table and partition path in the code. How can I do that?

This is my DDL:
```

CREATE EXTERNAL TABLE `DEFAULT`.`USER_PARQUET_READ` (

`id` BIGINT COMMENT '',

`name` STRING COMMENT '')

USING com.kyligence.spark.datasources.DefaultSource

PARTITIONED BY (`asOfDate` DATE COMMENT 'PARTITIONED KEY')

LOCATION ' ‘ —> this value is empty string, then I need to set a value in Scala 
code , how can I do that ?

TBLPROPERTIES (

  'transient_lastDdlTime' = '1641864235',

'skip.header.line.count' = '1')

```

Thanks & Best regards


Zhuolin Ji
Software Engineer



Mobile: +1 312 451 6352
E-Mail: zhuolin...@kyligence.io
Address: 99 Almaden Blvd Ste 663, San Jose, CA 95113
www.kyligence.io

[image001.png]



Re: How to explode array columns of a dataframe having the same length

2023-02-16 Thread Navneet
I am not expert, may be try if this works:
In order to achieve the desired output using the explode() method in
Java, you can create a User-Defined Function (UDF) that zips the lists
in each row and returns the resulting list. Here's an example
implementation:

typescript
Copy code
import org.apache.spark.sql.Row;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

public class ZipRows implements UDF1 {
@Override
public Row call(Row row) {
List list1 = row.getList(0);
List list2 = row.getList(1);
List list3 = row.getList(2);
List> zipped = new ArrayList<>();
for (int i = 0; i < list1.size(); i++) {
List sublist = new ArrayList<>();
sublist.add(list1.get(i));
sublist.add(list2.get(i));
sublist.add(list3.get(i));
zipped.add(sublist);
}
return RowFactory.create(zipped);
}
}
This UDF takes a Row as input, which contains the three lists in each
row of the original DataFrame. It then zips these lists using a loop
that creates a new sublist for each element in the lists. Finally, it
returns a new Row that contains the zipped list.

You can then use this UDF in combination with explode() to achieve the
desired output:

javascript
Copy code
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;

// assuming you have a Dataset called "df"
df.withColumn("zipped", callUDF(new ZipRows(),
DataTypes.createArrayType(DataTypes.createArrayType(DataTypes.StringType))),
"col1", "col2", "col3")
.selectExpr("explode(zipped) as zipped")
.selectExpr("zipped[0] as col1", "zipped[1] as col2", "zipped[2] as col3")
.show();
This code first adds a new column called "zipped" to the DataFrame
using the callUDF() function, which applies the ZipRows UDF to the
"col1", "col2", and "col3" columns. It then uses explode() to explode
the "zipped" column, and finally selects the three sub-elements of the
zipped list as separate columns using selectExpr(). The output should
be the desired DataFrame.



Regards,
Navneet Kr


On Thu, 16 Feb 2023 at 00:07, Enrico Minack  wrote:
>
> You have to take each row and zip the lists, each element of the result 
> becomes one new row.
>
> So turn write a method that turns
>   Row(List("A","B","null"), List("C","D","null"), List("E","null","null"))
> into
>   List(List("A","C","E"), List("B","D","null"), List("null","null","null"))
> and use flatmap with that method.
>
> In Scala, this would read:
>
> df.flatMap { row => (row.getSeq[String](0), row.getSeq[String](1), 
> row.getSeq[String](2)).zipped.toIterable }.show()
>
> Enrico
>
>
> Am 14.02.23 um 22:54 schrieb sam smith:
>
> Hello guys,
>
> I have the following dataframe:
>
> col1
>
> col2
>
> col3
>
> ["A","B","null"]
>
> ["C","D","null"]
>
> ["E","null","null"]
>
>
>
> I want to explode it to the following dataframe:
>
> col1
>
> col2
>
> col3
>
> "A"
>
> "C"
>
> "E"
>
> "B"
>
> "D"
>
> "null"
>
> "null"
>
> "null"
>
> "null"
>
>
> How to do that (preferably in Java) using the explode() method ? knowing that 
> something like the following won't yield correct output:
>
> for (String colName: dataset.columns())
> dataset=dataset.withColumn(colName,explode(dataset.col(colName)));
>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Running Spark on Kubernetes (GKE) - failing on spark-submit

2023-02-16 Thread Mich Talebzadeh
You can try this

gsutil cp src/StructuredStream-on-gke.py gs://codes/

where you create a bucket on gcs called codes


Then in you spark-submit do


spark-submit --verbose \
   --master k8s://https://$KUBERNETES_MASTER_IP:443 \
   --deploy-mode cluster \
   --name  \
  --conf spark.kubernetes.driver.container.image=pyspark-example:0.1
  \

   --conf spark.kubernetes.executor.container.image=
pyspark-example:0.1  \

gs://codes/StructuredStream-on-gke.py



HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 15 Feb 2023 at 21:17, karan alang  wrote:

> thnks, Mich .. let me check this
>
>
>
> On Wed, Feb 15, 2023 at 1:42 AM Mich Talebzadeh 
> wrote:
>
>>
>> It may help to check this article of mine
>>
>>
>> Spark on Kubernetes, A Practitioner’s Guide
>> 
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 15 Feb 2023 at 09:12, Mich Talebzadeh 
>> wrote:
>>
>>> Your submit command
>>>
>>> spark-submit --master k8s://https://34.74.22.140:7077 --deploy-mode
>>> cluster --name pyspark-example --conf 
>>> spark.kubernetes.container.image=pyspark-example:0.1
>>> --conf spark.kubernetes.file.upload.path=/myexample
>>> src/StructuredStream-on-gke.py
>>>
>>>
>>> pay attention to what it says
>>>
>>>
>>> --conf spark.kubernetes.file.upload.path
>>>
>>> That refers to your Python package on GCS storage not in the docker
>>> itself
>>>
>>>
>>> From
>>> https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management
>>>
>>>
>>> "... The app jar file will be uploaded to the S3 and then when the
>>> driver is launched it will be downloaded to the driver pod and will be
>>> added to its classpath. Spark will generate a subdir under the upload path
>>> with a random name to avoid conflicts with spark apps running in parallel.
>>> User could manage the subdirs created according to his needs..."
>>>
>>>
>>> In your case it is gs not s3
>>>
>>>
>>> There is no point putting your python file in the docker image itself!
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 15 Feb 2023 at 07:46, karan alang  wrote:
>>>
 Hi Ye,

 This is the error i get when i don't set the
 spark.kubernetes.file.upload.path

 Any ideas on how to fix this ?

 ```

 Exception in thread "main" org.apache.spark.SparkException: Please
 specify spark.kubernetes.file.upload.path property.

 at
 org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:299)

 at
 org.apache.spark.deploy.k8s.KubernetesUtils$.$anonfun$uploadAndTransformFileUris$1(KubernetesUtils.scala:248)

 at
 scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)

 at
 scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)

 at
 scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)

 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)

 at scala.collection.TraversableLike.map(TraversableLike.scala:238)

 at scala.collection.TraversableLike.map$(TraversableLike.scala:231)

 at scala.collection.AbstractTraversable.map(Traversable.scala:108)

 at
 org.apache.spark.deploy.k8s.KubernetesUtils$.uploadAndTransformFileUris(KubernetesUtils.scala:247)

 at
 org.apache.spark.deploy.k8s.features.BasicDriverFeatureStep.$an

Re: How to explode array columns of a dataframe having the same length

2023-02-16 Thread Enrico Minack
You have to take each row and zip the lists, each element of the result 
becomes one new row.


So turn write a method that turns
  Row(List("A","B","null"), List("C","D","null"), List("E","null","null"))
into
  List(List("A","C","E"), List("B","D","null"), List("null","null","null"))
and use flatmap with that method.

In Scala, this would read:

df.flatMap { row => (row.getSeq[String](0), row.getSeq[String](1), 
row.getSeq[String](2)).zipped.toIterable }.show()


Enrico


Am 14.02.23 um 22:54 schrieb sam smith:

Hello guys,

I have the following dataframe:

*col1*



*col2*



*col3*

["A","B","null"]



["C","D","null"]



["E","null","null"]




I want to explode it to the following dataframe:

*col1*



*col2*



*col3*

"A"



"C"



"E"

"B"



"D"



"null"

"null"



"null"



"null"


How to do that (preferably in Java) using the explode() method ? 
knowing that something like the following won't yield correct output:


for (String colName: dataset.columns())
dataset=dataset.withColumn(colName,explode(dataset.col(colName)));