GCP Dataproc - error in importing KafkaProducer

2022-02-17 Thread karan alang
Hello All,

I've a GCP Dataproc cluster, and I'm running a Spark StructuredStreaming
job on this.
I'm trying to use KafkaProducer to push aggregated data into a Kafka
topic,  however when i import KafkaProducer
(from kafka import KafkaProducer),
it gives error

```

Traceback (most recent call last):

  File
"/tmp/7e27e272e64b461dbdc2e5083dc23202/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
line 14, in 

from kafka.producer import KafkaProducer

  File "/opt/conda/default/lib/python3.8/site-packages/kafka/__init__.py",
line 23, in 

from kafka.producer import KafkaProducer

  File
"/opt/conda/default/lib/python3.8/site-packages/kafka/producer/__init__.py",
line 4, in 

from .simple import SimpleProducer

  File
"/opt/conda/default/lib/python3.8/site-packages/kafka/producer/simple.py",
line 54

return '' % self.async
```

As part of the initialization actions, i'm installing the following :
---

pip install pypi
pip install kafka-python
pip install google-cloud-storage
pip install pandas

---

Additional details in stackoverflow :
https://stackoverflow.com/questions/71169869/gcp-dataproc-getting-error-in-importing-kafkaproducer

Any ideas on what needs to be to fix this ?
tia!


Re: Cast int to string not possible?

2022-02-17 Thread Rico Bergmann
I found the reason why it did not work:

When returning the Spark data type I was calling new StringType(). When 
changing it to DataTypes.StringType it worked. 

Greets,
Rico. 

> Am 17.02.2022 um 14:13 schrieb Gourav Sengupta :
> 
> 
> Hi,
> 
> can you please post a screen shot of the exact CAST statement that you are 
> using? Did you use the SQL method mentioned by me earlier?
> 
> Regards,
> Gourav Sengupta
> 
>> On Thu, Feb 17, 2022 at 12:17 PM Rico Bergmann  wrote:
>> hi!
>> 
>> Casting another int column that is not a partition column fails with the 
>> same error. 
>> 
>> The Schema before the cast (column names are anonymized):
>> 
>> root
>> |-- valueObject: struct (nullable = true)
>> ||-- value1: string (nullable = true)
>> ||-- value2: string (nullable = true)
>> ||-- value3: timestamp (nullable = true)
>> ||-- value4: string (nullable = true)
>> |-- partitionColumn2: string (nullable = true)
>> |-- partitionColumn3: timestamp (nullable = true)
>> |-- partitionColumn1: integer (nullable = true)
>> 
>> I wanted to cast partitionColumn1 to String which gives me the described 
>> error. 
>> 
>> Best,
>> Rico
>> 
>> 
 Am 17.02.2022 um 09:56 schrieb ayan guha :
 
>>> 
>>> Can you try to cast any other Int field which is NOT a partition column? 
>>> 
 On Thu, 17 Feb 2022 at 7:34 pm, Gourav Sengupta 
  wrote:
 Hi,
 
 This appears interesting, casting INT to STRING has never been an issue 
 for me.
 
 Can you just help us with the output of : df.printSchema()  ?
 
 I prefer to use SQL, and the method I use for casting is: CAST(<>>> name>> AS STRING) <>.
 
 Regards,
 Gourav
 
 
 
 
 
 
> On Thu, Feb 17, 2022 at 6:02 AM Rico Bergmann  
> wrote:
> Here is the code snippet:
> 
> var df = session.read().parquet(basepath);
> for(Column partition : partitionColumnsList){
>   df = df.withColumn(partition.getName(), 
> df.col(partition.getName()).cast(partition.getType()));
> }
> 
> Column is a class containing Schema Information, like for example the 
> name of the column and the data type of the column. 
> 
> Best, Rico.
> 
> > Am 17.02.2022 um 03:17 schrieb Morven Huang :
> > 
> > Hi Rico, you have any code snippet? I have no problem casting int to 
> > string.
> > 
> >> 2022年2月17日 上午12:26,Rico Bergmann  写道:
> >> 
> >> Hi!
> >> 
> >> I am reading a partitioned dataFrame into spark using automatic type 
> >> inference for the partition columns. For one partition column the data 
> >> contains an integer, therefor Spark uses IntegerType for this column. 
> >> In general this is supposed to be a StringType column. So I tried to 
> >> cast this column to StringType. But this fails with AnalysisException 
> >> “cannot cast int to string”.
> >> 
> >> Is this a bug? Or is it really not allowed to cast an int to a string?
> >> 
> >> I’m using Spark 3.1.1
> >> 
> >> Best regards
> >> 
> >> Rico. 
> >> 
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> 
> > 
> > 
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > 
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
>>> -- 
>>> Best Regards,
>>> Ayan Guha


Encoders.STRING() causing performance problems in Java application

2022-02-17 Thread martin



Hello,

I am working on optimising the performance of a Java ML/NLP application 
based on Spark / SparkNLP. For prediction, I am applying a trained model 
on a Spark dataset which consists of one column with only one row. The 
dataset is created like this:


List textList = Collections.singletonList(text);
Dataset data = sparkSession
.createDataset(textList, Encoders.STRING())
.withColumnRenamed(COL_VALUE, COL_TEXT);

The predictions are created like this:

PipelineModel fittedPipeline = pipeline.fit(dataset);

Dataset prediction = fittedPipeline.transform(dataset);

We noticed that the performance isn't quite as good as expected. After 
profiling the application with VisualVM, I noticed that the problem is 
with org.apache.spark.sql.Encoders.STRING() in the creation of the 
dataset, which by itself takes up about 75% of the time for the whole 
prediction method call.


So, is there a simpler and more efficient way of creating the required 
dataset, consisting of one column and one String row?


Thanks a lot.

Cheers,

Martin

Position for 'cf.content' not found in row

2022-02-17 Thread 潘明文
HI,
   Could you help me the below issue,Thanks!
  This is my source code:
SparkConf sparkConf = new SparkConf(true);
sparkConf.setAppName(ESTest.class.getName());

SparkSession spark = null;
sparkConf.setMaster("local[*]");
sparkConf.set("spark.cleaner.ttl", "3600");
sparkConf.set("es.nodes", "10.12.65.10");
sparkConf.set("es.port", "9200");
sparkConf.set("es.nodes.discovery", "false");
sparkConf.set("es.nodes.wan.only", "true");
spark = SparkSession.builder().config(sparkConf).getOrCreate();

Dataset df1 = JavaEsSparkSQL.esDF(spark, "index");
df1.printSchema();
df1.show();


elasticsearch index:


When run the job has below issue:
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalStateException:Position for 
'cf.content' not found in row; typically this is caused by a mapping 
inconsistency
at 
org.elasticsearch.spark.sql.RowValueReader$class.addToBuffer(RowValueReader.scala:60)
at 
org.elasticsearch.spark.sql.ScalaRowValueReader.addToBuffer(ScalaEsRowValueReader.scala:32)
at 
org.elasticsearch.spark.sql.ScalaRowValueReader.addToMap(ScalaEsRowValueReader.scala:118)
at 
org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:1047)
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:889)
at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:602)
at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:426)
... 34 more




Thanks.




 





 





 

writing a Dataframe (with one of the columns as struct) into Kafka

2022-02-17 Thread karan alang
Hello All,
I've a pyspark dataframe which i need to write to Kafka topic.

Structure of the DF is :

root
 |-- window: struct (nullable = true)
 ||-- start: timestamp (nullable = false)
 ||-- end: timestamp (nullable = false)
 |-- processedAlarmCnt: integer (nullable = false)
 |-- totalAlarmCnt: integer (nullable = false)

Currently, i'm looping over the rows, and adding the data in a hashmap,
and then using KafkaProducer to push data into Kafka topic.

This does not seem very efficient, since i'm looping over each row,
and using extra space as well.
What is the best way to design/code this ?

Current Code :

def writeCountToKafka(df):
   if df.count()>0:
  hm = {}
  df_pandas = df.toPandas()
  for _, row in df_pandas.iterrows():
   hm["window"] =
[datetime.timestamp(row["window"]["start"]),datetime.timestamp(row["window"]["end"])]
   hm["processedAlarmCnt"] = row["processedAlarmCnt"]
   hm["totalAlarmCnt"] = row["totalAlarmCnt"]

   # Python Kafka Producer
   kafka_producer.send(topic_count,
json.dumps(mymap).encode('utf-8'))
   kafka_producer.flush()


More details are in stackoverflow :

https://stackoverflow.com/questions/71166560/structured-streaming-writing-dataframe-into-kafka-row-by-row-dataframe-has-a

tia !


Re: Spark 3.2.1 in Google Kubernetes Version 1.19 or 1.21 - SparkSubmit Error

2022-02-17 Thread Mich Talebzadeh
Just a create directory as below on gcp storage bucket

CODE_DIRECTORY_CLOUD="gs://spark-on-k8s/codes/"


Put your jar file there


gsutil cp /opt/spark/examples/jars/spark-examples_2.12-3.2.1.jar
 $CODE_DIRECTORY_CLOUD


  --conf spark.kubernetes.file.upload.path=file:///tmp \
  $CODE_DIRECTORY_CLOUD/spark-examples_2.12-3.2.1.jar

Where are you running spark-submit from?


HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 17 Feb 2022 at 18:24, Gnana Kumar  wrote:

> Though I have created the kubernetes RBAC as per Spark site in my GKE
> cluster,Im getting POD NAME null error.
>
> kubectl create serviceaccount spark
> kubectl create clusterrolebinding spark-role --clusterrole=edit
> --serviceaccount=default:spark --namespace=default
>
> On Thu, Feb 17, 2022 at 11:31 PM Gnana Kumar 
> wrote:
>
>> Hi Mich
>>
>> This is the latest error I'm stuck with. Please help me resolve this
>> issue.
>>
>> Exception in thread "main"
>> io.fabric8.kubernetes.client.KubernetesClientException: Operation: [create]
>>  for kind: [Pod]  with name: [null]  in namespace: [default]  failed.
>>
>> ~/spark/spark-3.2.1-bin-hadoop3.2/bin/spark-submit  \
>>--verbose \
>>--class org.apache.spark.examples.SparkPi \
>>--master k8s://${K8S_SERVER}:443 \
>>--deploy-mode cluster \
>>--name sparkBQ \
>>--conf spark.kubernetes.namespace=$NAMESPACE \
>>--conf spark.network.timeout=300 \
>>--conf spark.executor.instances=3 \
>>--conf spark.kubernetes.allocation.batch.size=3 \
>>--conf spark.kubernetes.allocation.batch.delay=1 \
>>--conf spark.driver.cores=3 \
>>--conf spark.executor.cores=3 \
>>--conf spark.driver.memory=8092m \
>>--conf spark.executor.memory=8092m \
>>--conf spark.dynamicAllocation.enabled=true \
>>--conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>--conf spark.kubernetes.driver.pod.name=spark-pi-driver \
>>--conf spark.kubernetes.driver.container.image=${SPARK_IMAGE}
>>  \
>>--conf spark.kubernetes.executor.container.image=
>> ${SPARK_IMAGE} \
>>
>>--conf 
>> spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>--conf spark.driver.extraJavaOptions=
>> "-Dio.netty.tryReflectionSetAccessible=true" \
>>--conf spark.executor.extraJavaOptions=
>> "-Dio.netty.tryReflectionSetAccessible=true"\
>>--conf spark.kubernetes.file.upload.path=file:///tmp \
>>local:///opt/spark/examples/jars/spark-examples_2.12-3.2.1.jar
>>
>> Thanks
>> GK
>>
>> On Thu, Feb 17, 2022 at 6:55 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Gnana,
>>>
>>> That JAR file /home/gnana_kumar123/spark/spark-3.2.1-
>>> bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.1.jar, is not
>>> visible to the GKE cluster such that all nodes can read it. I suggest that
>>> you put it on gs:// bucket in GCP and access it from there.
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 17 Feb 2022 at 13:05, Gnana Kumar 
>>> wrote:
>>>
 Hi There,

 I'm getting below error though I pass --class and --jars values
 while submitting a spark job through Spark-Submit.
 Please help.

 Exception in thread "main" org.apache.spark.SparkException: Failed to
 get main class in JAR with error 'File file:/home/gnana_kumar123/spark/
  does not exist'.  Please specify one with --class.
 at
 org.apache.spark.deploy.SparkSubmit.error(SparkSubmit.scala:972)
 at
 org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:486)
 at org.apache.spark.deploy.SparkSubmit.org
 $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:898)
 at
 org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
 at
 org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
 at

Re: Using Avro file format with SparkSQL

2022-02-17 Thread Artemis User

Please try these two corrections:

1. The --packages isn't the right command line argument for
   spark-submit.  Please use --conf spark.jars.packages=your-package to
   specify Maven packages or define your configuration parameters in
   the spark-defaults.conf file
2. Please check the version number of your spark-avro jar file in
   MavenCentral and see if that version is indeed available and
   compatible with Spark 3.2.  The version we are currently using for
   Spark 3.2 is spark-avro_2.12-3.1.1.jar, not 3.2.0.

BTW, you do have to include the spark-avro lib as a customer jar file.  
The Spark 3.2 distribution includes only the avro libs, not the 
spark-avro lib.  Hope this helps...


-- ND


On 2/9/22 10:25 PM, Karanika, Anna wrote:

Hello,

I have been trying to use spark SQL’s operations that are related to 
the Avro file format,
e.g., stored as, save, load, in a Java class but they keep failing 
with the following stack trace:


Exception in thread "main" org.apache.spark.sql.AnalysisException: 
 Failed to find data source: avro. Avro is built-in but external data 
source module since Spark 2.4. Please deploy the application as per 
the deployment section of "Apache Avro Data Source Guide".
        at 
org.apache.spark.sql.errors.QueryCompilationErrors$.failedToFindAvroDataSourceError(QueryCompilationErrors.scala:1032)
        at 
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:666)
        at 
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
        at 
org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:852)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
        at 
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)

        at xsys.fileformats.SparkSQLvsAvro.main(SparkSQLvsAvro.java:57)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.base/java.lang.reflect.Method.invoke(Method.java:564)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at 
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
        at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

For context, I am invoking spark-submit and adding arguments 
--packages org.apache.spark:spark-avro_2.12:3.2.0.

Yet, Spark responds as if the dependency was not added.
I am running spark-v3.2.0 (Scala 2.12).

On the other hand, everything works great with spark-shell or spark-sql.

I would appreciate any advice or feedback to get this running.

Thank you,
Anna



Re: Spark 3.2.1 in Google Kubernetes Version 1.19 or 1.21 - SparkSubmit Error

2022-02-17 Thread Gnana Kumar
Though I have created the kubernetes RBAC as per Spark site in my GKE
cluster,Im getting POD NAME null error.

kubectl create serviceaccount spark
kubectl create clusterrolebinding spark-role --clusterrole=edit
--serviceaccount=default:spark --namespace=default

On Thu, Feb 17, 2022 at 11:31 PM Gnana Kumar 
wrote:

> Hi Mich
>
> This is the latest error I'm stuck with. Please help me resolve this issue.
>
> Exception in thread "main"
> io.fabric8.kubernetes.client.KubernetesClientException: Operation: [create]
>  for kind: [Pod]  with name: [null]  in namespace: [default]  failed.
>
> ~/spark/spark-3.2.1-bin-hadoop3.2/bin/spark-submit  \
>--verbose \
>--class org.apache.spark.examples.SparkPi \
>--master k8s://${K8S_SERVER}:443 \
>--deploy-mode cluster \
>--name sparkBQ \
>--conf spark.kubernetes.namespace=$NAMESPACE \
>--conf spark.network.timeout=300 \
>--conf spark.executor.instances=3 \
>--conf spark.kubernetes.allocation.batch.size=3 \
>--conf spark.kubernetes.allocation.batch.delay=1 \
>--conf spark.driver.cores=3 \
>--conf spark.executor.cores=3 \
>--conf spark.driver.memory=8092m \
>--conf spark.executor.memory=8092m \
>--conf spark.dynamicAllocation.enabled=true \
>--conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>--conf spark.kubernetes.driver.pod.name=spark-pi-driver \
>--conf spark.kubernetes.driver.container.image=${SPARK_IMAGE} \
>--conf spark.kubernetes.executor.container.image=${SPARK_IMAGE}
>  \
>
>--conf 
> spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>--conf spark.driver.extraJavaOptions=
> "-Dio.netty.tryReflectionSetAccessible=true" \
>--conf spark.executor.extraJavaOptions=
> "-Dio.netty.tryReflectionSetAccessible=true"\
>--conf spark.kubernetes.file.upload.path=file:///tmp \
>local:///opt/spark/examples/jars/spark-examples_2.12-3.2.1.jar
>
> Thanks
> GK
>
> On Thu, Feb 17, 2022 at 6:55 PM Mich Talebzadeh 
> wrote:
>
>> Hi Gnana,
>>
>> That JAR file /home/gnana_kumar123/spark/spark-3.2.1-
>> bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.1.jar, is not
>> visible to the GKE cluster such that all nodes can read it. I suggest that
>> you put it on gs:// bucket in GCP and access it from there.
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 17 Feb 2022 at 13:05, Gnana Kumar 
>> wrote:
>>
>>> Hi There,
>>>
>>> I'm getting below error though I pass --class and --jars values
>>> while submitting a spark job through Spark-Submit.
>>> Please help.
>>>
>>> Exception in thread "main" org.apache.spark.SparkException: Failed to
>>> get main class in JAR with error 'File file:/home/gnana_kumar123/spark/
>>>  does not exist'.  Please specify one with --class.
>>> at
>>> org.apache.spark.deploy.SparkSubmit.error(SparkSubmit.scala:972)
>>> at
>>> org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:486)
>>> at org.apache.spark.deploy.SparkSubmit.org
>>> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:898)
>>> at
>>> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
>>> at
>>> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
>>> at
>>> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>>
>>>
>>> ~/spark/spark-3.2.1-bin-hadoop3.2/bin/spark-submit \
>>>--master k8s://${K8S_SERVER}:443 \
>>>--deploy-mode cluster \
>>>--name sparkBQ \
>>>--conf spark.kubernetes.namespace=$NAMESPACE \
>>>--conf spark.network.timeout=300 \
>>>--conf spark.executor.instances=3 \
>>>--conf spark.kubernetes.allocation.batch.size=3 \
>>>--conf spark.kubernetes.allocation.batch.delay=1 \
>>>--conf spark.driver.cores=3 \
>>>--conf spark.executor.cores=3 \
>>>--conf spark.driver.memory=8092m \
>>>--conf spark.executor.memory=8092m \
>>>--conf spark.dynamicAllocation.enabled=true 

Re: Spark 3.2.1 in Google Kubernetes Version 1.19 or 1.21 - SparkSubmit Error

2022-02-17 Thread Gnana Kumar
Hi Mich

This is the latest error I'm stuck with. Please help me resolve this issue.

Exception in thread "main"
io.fabric8.kubernetes.client.KubernetesClientException: Operation: [create]
 for kind: [Pod]  with name: [null]  in namespace: [default]  failed.

~/spark/spark-3.2.1-bin-hadoop3.2/bin/spark-submit  \
   --verbose \
   --class org.apache.spark.examples.SparkPi \
   --master k8s://${K8S_SERVER}:443 \
   --deploy-mode cluster \
   --name sparkBQ \
   --conf spark.kubernetes.namespace=$NAMESPACE \
   --conf spark.network.timeout=300 \
   --conf spark.executor.instances=3 \
   --conf spark.kubernetes.allocation.batch.size=3 \
   --conf spark.kubernetes.allocation.batch.delay=1 \
   --conf spark.driver.cores=3 \
   --conf spark.executor.cores=3 \
   --conf spark.driver.memory=8092m \
   --conf spark.executor.memory=8092m \
   --conf spark.dynamicAllocation.enabled=true \
   --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
   --conf spark.kubernetes.driver.pod.name=spark-pi-driver \
   --conf spark.kubernetes.driver.container.image=${SPARK_IMAGE} \
   --conf spark.kubernetes.executor.container.image=${SPARK_IMAGE} \
   --conf
spark.kubernetes.authenticate.driver.serviceAccountName=spark \
   --conf spark.driver.extraJavaOptions=
"-Dio.netty.tryReflectionSetAccessible=true" \
   --conf spark.executor.extraJavaOptions=
"-Dio.netty.tryReflectionSetAccessible=true"\
   --conf spark.kubernetes.file.upload.path=file:///tmp \
   local:///opt/spark/examples/jars/spark-examples_2.12-3.2.1.jar

Thanks
GK

On Thu, Feb 17, 2022 at 6:55 PM Mich Talebzadeh 
wrote:

> Hi Gnana,
>
> That JAR file /home/gnana_kumar123/spark/spark-3.2.1-
> bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.1.jar, is not visible
> to the GKE cluster such that all nodes can read it. I suggest that you put
> it on gs:// bucket in GCP and access it from there.
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 17 Feb 2022 at 13:05, Gnana Kumar 
> wrote:
>
>> Hi There,
>>
>> I'm getting below error though I pass --class and --jars values
>> while submitting a spark job through Spark-Submit.
>> Please help.
>>
>> Exception in thread "main" org.apache.spark.SparkException: Failed to get
>> main class in JAR with error 'File file:/home/gnana_kumar123/spark/  does
>> not exist'.  Please specify one with --class.
>> at
>> org.apache.spark.deploy.SparkSubmit.error(SparkSubmit.scala:972)
>> at
>> org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:486)
>> at org.apache.spark.deploy.SparkSubmit.org
>> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:898)
>> at
>> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
>> at
>> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
>> at
>> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
>> at
>> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
>> at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>>
>> ~/spark/spark-3.2.1-bin-hadoop3.2/bin/spark-submit \
>>--master k8s://${K8S_SERVER}:443 \
>>--deploy-mode cluster \
>>--name sparkBQ \
>>--conf spark.kubernetes.namespace=$NAMESPACE \
>>--conf spark.network.timeout=300 \
>>--conf spark.executor.instances=3 \
>>--conf spark.kubernetes.allocation.batch.size=3 \
>>--conf spark.kubernetes.allocation.batch.delay=1 \
>>--conf spark.driver.cores=3 \
>>--conf spark.executor.cores=3 \
>>--conf spark.driver.memory=8092m \
>>--conf spark.executor.memory=8092m \
>>--conf spark.dynamicAllocation.enabled=true \
>>--conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>--conf spark.kubernetes.driver.container.image=${SPARK_IMAGE}
>>  \
>>--conf spark.kubernetes.executor.container.image=
>> ${SPARK_IMAGE} \
>>
>>--conf 
>> spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>--conf spark.driver.extraJavaOptions=
>> "-Dio.netty.tryReflectionSetAccessible=true" \
>>--conf 

Re: StructuredStreaming - foreach/foreachBatch

2022-02-17 Thread Gourav Sengupta
Hi,

The following excellent documentation may help as well:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

The book from Dr. Zaharia on SPARK does a fantastic job in explaining the
fundamental thinking behind these concepts.


Regards,
Gourav Sengupta



On Wed, Feb 9, 2022 at 8:51 PM karan alang  wrote:

> Thanks, Mich .. will check it out
>
> regds,
> Karan Alang
>
> On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh 
> wrote:
>
>> BTW you can check this Linkedin article of mine on Processing Change
>> Data Capture with Spark Structured Streaming
>> 
>>
>>
>> It covers the concept of triggers including trigger(once = True) or
>> one-time batch in Spark Structured Streaming
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 7 Feb 2022 at 23:06, karan alang  wrote:
>>
>>> Thanks, Mich .. that worked fine!
>>>
>>>
>>> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 read below

 """
"foreach" performs custom write logic on each row and
 "foreachBatch" performs custom write logic on each micro-batch through
 SendToBigQuery function
 *foreachBatch(SendToBigQuery) expects 2 parameters,
 first: micro-batch as DataFrame or Dataset and second: unique id for each
 batch --> batchId*
Using foreachBatch, we write each micro batch to storage
 defined in our custom logic. In this case, we store the output of our
 streaming application to Google BigQuery table.
Note that we are appending data and column "rowkey" is
 defined as UUID so it can be used as the primary key
 """
 result = streamingDataFrame.select( \
  col("parsed_value.rowkey").alias("rowkey") \
, col("parsed_value.ticker").alias("ticker") \
, col("parsed_value.timeissued").alias("timeissued")
 \
, col("parsed_value.price").alias("price")). \
  writeStream. \
  outputMode('append'). \
  option("truncate", "false"). \
  *foreachBatch(SendToBigQuery)*. \
  trigger(processingTime='2 seconds'). \
  start()

 now you define your function *SendToBigQuery() *


 *def SendToBigQuery(df, batchId):*

 if(len(df.take(1))) > 0:

 df.printSchema()

 print(f"""batchId is {batchId}""")

 rows = df.count()

 print(f""" Total records processed in this run = {rows}""")

 ..

 else:

 print("DataFrame is empty")

 *HTH*


view my Linkedin profile
 



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Mon, 7 Feb 2022 at 21:06, karan alang  wrote:

> Hello All,
>
> I'm using StructuredStreaming to read data from Kafka, and need to do
> transformation on each individual row.
>
> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
> Basic question - how is the row passed to the function when foreach is
> used ?
>
> Also, when I use foreachBatch, seems the BatchId is available in the
> function called ? How do I access individual rows ?
>
> Details are in stackoverflow :
>
> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>
> What is the best approach for this use-case ?
>
> tia!
>



Re: Spark 3.2.1 in Google Kubernetes Version 1.19 or 1.21 - SparkSubmit Error

2022-02-17 Thread Mich Talebzadeh
Hi Gnana,

That JAR file /home/gnana_kumar123/spark/spark-3.2.1-
bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.1.jar, is not visible
to the GKE cluster such that all nodes can read it. I suggest that you put
it on gs:// bucket in GCP and access it from there.


HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 17 Feb 2022 at 13:05, Gnana Kumar  wrote:

> Hi There,
>
> I'm getting below error though I pass --class and --jars values
> while submitting a spark job through Spark-Submit.
> Please help.
>
> Exception in thread "main" org.apache.spark.SparkException: Failed to get
> main class in JAR with error 'File file:/home/gnana_kumar123/spark/  does
> not exist'.  Please specify one with --class.
> at org.apache.spark.deploy.SparkSubmit.error(SparkSubmit.scala:972)
> at
> org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:486)
> at org.apache.spark.deploy.SparkSubmit.org
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:898)
> at
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
> at
> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
> at
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
> at
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>
> ~/spark/spark-3.2.1-bin-hadoop3.2/bin/spark-submit \
>--master k8s://${K8S_SERVER}:443 \
>--deploy-mode cluster \
>--name sparkBQ \
>--conf spark.kubernetes.namespace=$NAMESPACE \
>--conf spark.network.timeout=300 \
>--conf spark.executor.instances=3 \
>--conf spark.kubernetes.allocation.batch.size=3 \
>--conf spark.kubernetes.allocation.batch.delay=1 \
>--conf spark.driver.cores=3 \
>--conf spark.executor.cores=3 \
>--conf spark.driver.memory=8092m \
>--conf spark.executor.memory=8092m \
>--conf spark.dynamicAllocation.enabled=true \
>--conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>--conf spark.kubernetes.driver.container.image=${SPARK_IMAGE} \
>--conf spark.kubernetes.executor.container.image=${SPARK_IMAGE}
>  \
>
>--conf 
> spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>--conf spark.driver.extraJavaOptions=
> "-Dio.netty.tryReflectionSetAccessible=true" \
>--conf spark.executor.extraJavaOptions=
> "-Dio.netty.tryReflectionSetAccessible=true" \
>--class org.apache.spark.examples.SparkPi \
>
>--jars 
> /home/gnana_kumar123/spark/spark-3.2.1-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.1.jar
>
> Thanks
> GK
>
>
>
> On Wed, Feb 16, 2022 at 11:11 PM Gnana Kumar 
> wrote:
>
>> Hi Mich
>>
>> Also I would like to run Spark nodes ( Master and Worker nodes in
>> Kubernetes) and then run my Java Spark application from a JAR file.
>>
>> Can you please let me know how to specify the JAR file and the MAIN class.
>>
>> Thanks
>> GK
>>
>> On Wed, Feb 16, 2022 at 10:36 PM Gnana Kumar 
>> wrote:
>>
>>> Hi Mich,
>>>
>>> I have built the image using the Dockerfile present
>>> in spark-3.2.1-bin-hadoop3.2.tgz.
>>>
>>> Also I have pushed the same image to my docker hub account ie.
>>> docker.io/gnanakumar123/spark3.2.1:latest
>>>
>>> I believe spark submit can pull image from docker hub when I run from
>>> GKE's Cloud Shell. Please confirm.
>>>
>>> Below is the command I'm running.
>>>
>>> ./spark-submit \
>>>   --master k8s://$K8S_SERVER \
>>>   --deploy-mode cluster \
>>>   --name spark-driver-pod \
>>>   --class org.apache.spark.examples.SparkPi \
>>>   --conf spark.executor.instances=2 \
>>>   --conf spark.kubernetes.driver.container.image=
>>> docker.io/gnanakumar123/spark3.2.1:latest \
>>>   --conf spark.kubernetes.executor.container.image=
>>> docker.io/gnanakumar123/spark3.2.1:latest \
>>>   --conf spark.kubernetes.container.image=
>>> docker.io/gnanakumar123/spark3.2.1:latest \
>>>   --conf spark.kubernetes.driver.pod.name=spark-driver-pod \
>>>   --conf spark.kubernetes.namespace=spark-demo \
>>>   --conf spark.kubernetes.container.image.pullPolicy=Never \
>>>   --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>> $SPARK_HOME/examples/jars/spark-examples_2.12-3.2.1.jar
>>>
>>> Thanks
>>> GK

Re: Cast int to string not possible?

2022-02-17 Thread Gourav Sengupta
Hi,

can you please post a screen shot of the exact CAST statement that you are
using? Did you use the SQL method mentioned by me earlier?

Regards,
Gourav Sengupta

On Thu, Feb 17, 2022 at 12:17 PM Rico Bergmann  wrote:

> hi!
>
> Casting another int column that is not a partition column fails with the
> same error.
>
> The Schema before the cast (column names are anonymized):
>
> root
>
> |-- valueObject: struct (nullable = true)
>
> ||-- value1: string (nullable = true)
>
> ||-- value2: string (nullable = true)
>
> ||-- value3: timestamp (nullable = true)
>
> ||-- value4: string (nullable = true)
>
> |-- partitionColumn2: string (nullable = true)
>
> |-- partitionColumn3: timestamp (nullable = true)
>
> |-- partitionColumn1: integer (nullable = true)
>
>
> I wanted to cast partitionColumn1 to String which gives me the described
> error.
>
>
> Best,
>
> Rico
>
>
>
> Am 17.02.2022 um 09:56 schrieb ayan guha :
>
> 
> Can you try to cast any other Int field which is NOT a partition column?
>
> On Thu, 17 Feb 2022 at 7:34 pm, Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> This appears interesting, casting INT to STRING has never been an issue
>> for me.
>>
>> Can you just help us with the output of : df.printSchema()  ?
>>
>> I prefer to use SQL, and the method I use for casting is: CAST(<> name>> AS STRING) <>.
>>
>> Regards,
>> Gourav
>>
>>
>>
>>
>>
>>
>> On Thu, Feb 17, 2022 at 6:02 AM Rico Bergmann 
>> wrote:
>>
>>> Here is the code snippet:
>>>
>>> var df = session.read().parquet(basepath);
>>> for(Column partition : partitionColumnsList){
>>>   df = df.withColumn(partition.getName(),
>>> df.col(partition.getName()).cast(partition.getType()));
>>> }
>>>
>>> Column is a class containing Schema Information, like for example the
>>> name of the column and the data type of the column.
>>>
>>> Best, Rico.
>>>
>>> > Am 17.02.2022 um 03:17 schrieb Morven Huang :
>>> >
>>> > Hi Rico, you have any code snippet? I have no problem casting int to
>>> string.
>>> >
>>> >> 2022年2月17日 上午12:26,Rico Bergmann  写道:
>>> >>
>>> >> Hi!
>>> >>
>>> >> I am reading a partitioned dataFrame into spark using automatic type
>>> inference for the partition columns. For one partition column the data
>>> contains an integer, therefor Spark uses IntegerType for this column. In
>>> general this is supposed to be a StringType column. So I tried to cast this
>>> column to StringType. But this fails with AnalysisException “cannot cast
>>> int to string”.
>>> >>
>>> >> Is this a bug? Or is it really not allowed to cast an int to a string?
>>> >>
>>> >> I’m using Spark 3.1.1
>>> >>
>>> >> Best regards
>>> >>
>>> >> Rico.
>>> >>
>>> >> -
>>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >>
>>> >
>>> >
>>> > -
>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >
>>>
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>> --
> Best Regards,
> Ayan Guha
>
>


Fwd: Spark 3.2.1 in Google Kubernetes Version 1.19 or 1.21 - SparkSubmit Error

2022-02-17 Thread Gnana Kumar
Hi There,

I'm getting below error though I pass --class and --jars values
while submitting a spark job through Spark-Submit.
Please help.

Exception in thread "main" org.apache.spark.SparkException: Failed to get
main class in JAR with error 'File file:/home/gnana_kumar123/spark/  does
not exist'.  Please specify one with --class.
at org.apache.spark.deploy.SparkSubmit.error(SparkSubmit.scala:972)
at
org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:486)
at org.apache.spark.deploy.SparkSubmit.org
$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:898)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



~/spark/spark-3.2.1-bin-hadoop3.2/bin/spark-submit \
   --master k8s://${K8S_SERVER}:443 \
   --deploy-mode cluster \
   --name sparkBQ \
   --conf spark.kubernetes.namespace=$NAMESPACE \
   --conf spark.network.timeout=300 \
   --conf spark.executor.instances=3 \
   --conf spark.kubernetes.allocation.batch.size=3 \
   --conf spark.kubernetes.allocation.batch.delay=1 \
   --conf spark.driver.cores=3 \
   --conf spark.executor.cores=3 \
   --conf spark.driver.memory=8092m \
   --conf spark.executor.memory=8092m \
   --conf spark.dynamicAllocation.enabled=true \
   --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
   --conf spark.kubernetes.driver.container.image=${SPARK_IMAGE} \
   --conf spark.kubernetes.executor.container.image=${SPARK_IMAGE} \
   --conf
spark.kubernetes.authenticate.driver.serviceAccountName=spark \
   --conf spark.driver.extraJavaOptions=
"-Dio.netty.tryReflectionSetAccessible=true" \
   --conf spark.executor.extraJavaOptions=
"-Dio.netty.tryReflectionSetAccessible=true" \
   --class org.apache.spark.examples.SparkPi \
   --jars
/home/gnana_kumar123/spark/spark-3.2.1-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.1.jar

Thanks
GK



On Wed, Feb 16, 2022 at 11:11 PM Gnana Kumar 
wrote:

> Hi Mich
>
> Also I would like to run Spark nodes ( Master and Worker nodes in
> Kubernetes) and then run my Java Spark application from a JAR file.
>
> Can you please let me know how to specify the JAR file and the MAIN class.
>
> Thanks
> GK
>
> On Wed, Feb 16, 2022 at 10:36 PM Gnana Kumar 
> wrote:
>
>> Hi Mich,
>>
>> I have built the image using the Dockerfile present
>> in spark-3.2.1-bin-hadoop3.2.tgz.
>>
>> Also I have pushed the same image to my docker hub account ie.
>> docker.io/gnanakumar123/spark3.2.1:latest
>>
>> I believe spark submit can pull image from docker hub when I run from
>> GKE's Cloud Shell. Please confirm.
>>
>> Below is the command I'm running.
>>
>> ./spark-submit \
>>   --master k8s://$K8S_SERVER \
>>   --deploy-mode cluster \
>>   --name spark-driver-pod \
>>   --class org.apache.spark.examples.SparkPi \
>>   --conf spark.executor.instances=2 \
>>   --conf spark.kubernetes.driver.container.image=
>> docker.io/gnanakumar123/spark3.2.1:latest \
>>   --conf spark.kubernetes.executor.container.image=
>> docker.io/gnanakumar123/spark3.2.1:latest \
>>   --conf spark.kubernetes.container.image=
>> docker.io/gnanakumar123/spark3.2.1:latest \
>>   --conf spark.kubernetes.driver.pod.name=spark-driver-pod \
>>   --conf spark.kubernetes.namespace=spark-demo \
>>   --conf spark.kubernetes.container.image.pullPolicy=Never \
>>   --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>> $SPARK_HOME/examples/jars/spark-examples_2.12-3.2.1.jar
>>
>> Thanks
>> GK
>>
>>
>> On Mon, Feb 14, 2022 at 10:50 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi
>>>
>>>
>>> It is complaining about the missing driver container image. Does
>>> $SPARK_IMAGE point to a valid image in the GCP container registry?
>>>
>>> Example for a docker image for PySpark
>>>
>>>
>>> IMAGEDRIVER="eu.gcr.io/
>>> /spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages"
>>>
>>>
>>> spark-submit --verbose \
>>>
>>>--properties-file ${property_file} \
>>>
>>>--master k8s://https://$KUBERNETES_MASTER_IP:443 \
>>>
>>>--deploy-mode cluster \
>>>
>>>--name sparkBQ \
>>>
>>>--py-files $CODE_DIRECTORY_CLOUD/spark_on_gke.zip \
>>>
>>>--conf spark.kubernetes.namespace=$NAMESPACE \
>>>
>>>--conf spark.network.timeout=300 \
>>>
>>>--conf spark.executor.instances=$NEXEC \
>>>
>>>--conf 

Re: Cast int to string not possible?

2022-02-17 Thread Rico Bergmann
hi!

Casting another int column that is not a partition column fails with the same 
error. 

The Schema before the cast (column names are anonymized):

root
|-- valueObject: struct (nullable = true)
||-- value1: string (nullable = true)
||-- value2: string (nullable = true)
||-- value3: timestamp (nullable = true)
||-- value4: string (nullable = true)
|-- partitionColumn2: string (nullable = true)
|-- partitionColumn3: timestamp (nullable = true)
|-- partitionColumn1: integer (nullable = true)

I wanted to cast partitionColumn1 to String which gives me the described error. 

Best,
Rico


> Am 17.02.2022 um 09:56 schrieb ayan guha :
> 
> Can you try to cast any other Int field which is NOT a partition column? 
> 
> On Thu, 17 Feb 2022 at 7:34 pm, Gourav Sengupta  
> wrote:
>> Hi,
>> 
>> This appears interesting, casting INT to STRING has never been an issue for 
>> me.
>> 
>> Can you just help us with the output of : df.printSchema()  ?
>> 
>> I prefer to use SQL, and the method I use for casting is: CAST(<> name>> AS STRING) <>.
>> 
>> Regards,
>> Gourav
>> 
>> 
>> 
>> 
>> 
>> 
>> On Thu, Feb 17, 2022 at 6:02 AM Rico Bergmann  wrote:
>>> Here is the code snippet:
>>> 
>>> var df = session.read().parquet(basepath);
>>> for(Column partition : partitionColumnsList){
>>>   df = df.withColumn(partition.getName(), 
>>> df.col(partition.getName()).cast(partition.getType()));
>>> }
>>> 
>>> Column is a class containing Schema Information, like for example the name 
>>> of the column and the data type of the column. 
>>> 
>>> Best, Rico.
>>> 
>>> > Am 17.02.2022 um 03:17 schrieb Morven Huang :
>>> > 
>>> > Hi Rico, you have any code snippet? I have no problem casting int to 
>>> > string.
>>> > 
>>> >> 2022年2月17日 上午12:26,Rico Bergmann  写道:
>>> >> 
>>> >> Hi!
>>> >> 
>>> >> I am reading a partitioned dataFrame into spark using automatic type 
>>> >> inference for the partition columns. For one partition column the data 
>>> >> contains an integer, therefor Spark uses IntegerType for this column. In 
>>> >> general this is supposed to be a StringType column. So I tried to cast 
>>> >> this column to StringType. But this fails with AnalysisException “cannot 
>>> >> cast int to string”.
>>> >> 
>>> >> Is this a bug? Or is it really not allowed to cast an int to a string?
>>> >> 
>>> >> I’m using Spark 3.1.1
>>> >> 
>>> >> Best regards
>>> >> 
>>> >> Rico. 
>>> >> 
>>> >> -
>>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >> 
>>> > 
>>> > 
>>> > -
>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> > 
>>> 
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> -- 
> Best Regards,
> Ayan Guha


Re: SparkStructured Streaming using withWatermark - TypeError: 'module' object is not callable

2022-02-17 Thread Mich Talebzadeh
OK, that sounds reasonable.

In the code below

 #Aggregation code in Alarm call, which uses withWatermark
 def computeCount(df_processedAlarm, df_totalAlarm):
  processedAlarmCnt = None
  if df_processedAlarm.count() > 0:
   processedAlarmCnt =
df_processedAlarm.withWatermark("timestamp", "10 seconds")\
   .groupBy(
window(col("timestamp"), "1 minutes").alias("window")
).count()


It is more efficient to use


 * if(len(df_processedAlarm.take(1)) > 0:*
   processedAlarmCnt =
df_processedAlarm.withWatermark("timestamp", "10 seconds")\
   .groupBy(
window(col("timestamp"), "1 minutes").alias("window")
).count()

  else:

  print("DataFrame is empty")

HTH



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 17 Feb 2022 at 06:33, karan alang  wrote:

> Hi Mich,
> the issue was related to incorrect, which is resolved.
>
> However, wrt your comment - 'OK sounds like your watermark is done
> outside of your processing.'
>
> In my use-case which primarily deals with syslogs, syslog is a string
> which needs to be parsed (with defensive coding built in to ensure records
> are in correct format), before it is fed to
> 3 different classes (AlarmProc being one of them) - where there is
> additional parsing + aggregation for specific types of logs.
> The way I'm handling this is by using -- foreachBatch(convertToDict) in
> the writeStream method, and the parsing + aggregation happens for the
> microbatch.
> foreachBatch - will wait for the parsing and aggregation to complete for
> the microbatch, and then proceed to do the same with the next microbatch.
>
> Since it involves a lot of parsing + aggregation, it requires more than a
> df.select() - hence the approach above is taken.
> From what I understand, the watermark is done within the processing ..
> since it is done per microbatch pulled with each trigger.
>
> Pls let me know if you have comments/suggestions on this approach.
>
> thanks,
> Karan Alang
>
>
> On Wed, Feb 16, 2022 at 12:52 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> OK sounds like your watermark is done outside of your processing.
>>
>> Check this
>>
>> # construct a streaming dataframe streamingDataFrame that
>> subscribes to topic temperature
>> streamingDataFrame = self.spark \
>> .readStream \
>> .format("kafka") \
>> .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>> .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>> .option("group.id", config['common']['appName']) \
>> .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>> .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>> .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>> .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>> .option("subscribe", "temperature") \
>> .option("failOnDataLoss", "false") \
>> .option("includeHeaders", "true") \
>> .option("startingOffsets", "latest") \
>> .load() \
>> .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value"))
>>
>>
>> resultM = streamingDataFrame.select( \
>>  col("parsed_value.rowkey").alias("rowkey") \
>>, col("parsed_value.timestamp").alias("timestamp") \
>>, col("parsed_value.temperature").alias("temperature"))
>> result = resultM. \
>>  withWatermark("timestamp", "5 minutes"). \
>>  groupBy(window(resultM.timestamp, "5 minutes", "5
>> minutes")). \
>>  avg('temperature'). \
>>  writeStream. \
>>  outputMode('complete'). \
>>  option("numRows", 1000). \
>>  option("truncate", "false"). \
>>  format('console'). \
>>  option('checkpointLocation', checkpoint_path). \
>>  queryName("temperature"). \
>>  start()
>>
>> HTH
>>
>>
>>
>>view my Linkedin profile
>> 

Re: Cast int to string not possible?

2022-02-17 Thread ayan guha
Can you try to cast any other Int field which is NOT a partition column?

On Thu, 17 Feb 2022 at 7:34 pm, Gourav Sengupta 
wrote:

> Hi,
>
> This appears interesting, casting INT to STRING has never been an issue
> for me.
>
> Can you just help us with the output of : df.printSchema()  ?
>
> I prefer to use SQL, and the method I use for casting is: CAST(< name>> AS STRING) <>.
>
> Regards,
> Gourav
>
>
>
>
>
>
> On Thu, Feb 17, 2022 at 6:02 AM Rico Bergmann 
> wrote:
>
>> Here is the code snippet:
>>
>> var df = session.read().parquet(basepath);
>> for(Column partition : partitionColumnsList){
>>   df = df.withColumn(partition.getName(),
>> df.col(partition.getName()).cast(partition.getType()));
>> }
>>
>> Column is a class containing Schema Information, like for example the
>> name of the column and the data type of the column.
>>
>> Best, Rico.
>>
>> > Am 17.02.2022 um 03:17 schrieb Morven Huang :
>> >
>> > Hi Rico, you have any code snippet? I have no problem casting int to
>> string.
>> >
>> >> 2022年2月17日 上午12:26,Rico Bergmann  写道:
>> >>
>> >> Hi!
>> >>
>> >> I am reading a partitioned dataFrame into spark using automatic type
>> inference for the partition columns. For one partition column the data
>> contains an integer, therefor Spark uses IntegerType for this column. In
>> general this is supposed to be a StringType column. So I tried to cast this
>> column to StringType. But this fails with AnalysisException “cannot cast
>> int to string”.
>> >>
>> >> Is this a bug? Or is it really not allowed to cast an int to a string?
>> >>
>> >> I’m using Spark 3.1.1
>> >>
>> >> Best regards
>> >>
>> >> Rico.
>> >>
>> >> -
>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >>
>> >
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
Best Regards,
Ayan Guha


Re: Cast int to string not possible?

2022-02-17 Thread Gourav Sengupta
Hi,

This appears interesting, casting INT to STRING has never been an issue for
me.

Can you just help us with the output of : df.printSchema()  ?

I prefer to use SQL, and the method I use for casting is: CAST(<> AS STRING) <>.

Regards,
Gourav






On Thu, Feb 17, 2022 at 6:02 AM Rico Bergmann  wrote:

> Here is the code snippet:
>
> var df = session.read().parquet(basepath);
> for(Column partition : partitionColumnsList){
>   df = df.withColumn(partition.getName(),
> df.col(partition.getName()).cast(partition.getType()));
> }
>
> Column is a class containing Schema Information, like for example the name
> of the column and the data type of the column.
>
> Best, Rico.
>
> > Am 17.02.2022 um 03:17 schrieb Morven Huang :
> >
> > Hi Rico, you have any code snippet? I have no problem casting int to
> string.
> >
> >> 2022年2月17日 上午12:26,Rico Bergmann  写道:
> >>
> >> Hi!
> >>
> >> I am reading a partitioned dataFrame into spark using automatic type
> inference for the partition columns. For one partition column the data
> contains an integer, therefor Spark uses IntegerType for this column. In
> general this is supposed to be a StringType column. So I tried to cast this
> column to StringType. But this fails with AnalysisException “cannot cast
> int to string”.
> >>
> >> Is this a bug? Or is it really not allowed to cast an int to a string?
> >>
> >> I’m using Spark 3.1.1
> >>
> >> Best regards
> >>
> >> Rico.
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>