Re: Got Error Creating permanent view in Postgresql through Pyspark code

2023-01-05 Thread Stelios Philippou
Vajiha,

I dont see your query working as you hope it will.

spark.sql will execute a query on a database level

to retrieve the temp view you need to go from the sessions.
i.e

session.sql("SELECT * FROM TEP_VIEW")

You might need to retrieve the data in a collection and iterate over them
to do batch insertion using spark.sql("INSERt ...");

Hope this helps

Stelios


--
Hi Stelios Philippou,
I need to create a view table in Postgresql DB using pyspark code. But I'm
unable to create a view table, I can able to create table through pyspark
code.
I need to know Whether through Pyspark code can I create view table in
postgresql database or not. Thanks for you reply

Pyspark Code:
df.createOrReplaceTempView("TEMP_VIEW")
spark.sql("CREATE VIEW TEMP1 AS SELECT * FROM TEMP_VIEW")

On Wed, 4 Jan 2023 at 15:10, Vajiha Begum S A 
wrote:

>
> I have tried to Create a permanent view in Postgresql DB through Pyspark
> code, but I have received the below error message. Kindly help me to create
> a permanent view table in the database.How shall create permanent view
> using Pyspark code. Please do reply.
>
> *Error Message::*
> *Exception has occurred: Analysis Exception*
> Not allowed to create a permanent view `default`.`TEMP1` by referencing a
> temporary view TEMP_VIEW. Please create a temp view instead by CREATE TEMP
> VIEW
>
>
> Regards,
> Vajiha
> Research Analyst
> MW Solutions
>


Re: Got Error Creating permanent view in Postgresql through Pyspark code

2023-01-04 Thread Stelios Philippou
Vajiha,

I believe that you might be confusing stuff ?
Permanent View in PSQL is a standard view.

Temp view or Global View is the Spark View that is internal for Spark.

Can we get a snippet of the code please.


On Wed, 4 Jan 2023 at 15:10, Vajiha Begum S A 
wrote:

>
> I have tried to Create a permanent view in Postgresql DB through Pyspark
> code, but I have received the below error message. Kindly help me to create
> a permanent view table in the database.How shall create permanent view
> using Pyspark code. Please do reply.
>
> *Error Message::*
> *Exception has occurred: Analysis Exception*
> Not allowed to create a permanent view `default`.`TEMP1` by referencing a
> temporary view TEMP_VIEW. Please create a temp view instead by CREATE TEMP
> VIEW
>
>
> Regards,
> Vajiha
> Research Analyst
> MW Solutions
>


Re: Spark migration from 2.3 to 3.0.1

2023-01-02 Thread Stelios Philippou
Can we see your Spark Configuration parameters ?

The mater URL refers to as per java
new SparkConf()setMaster("local[*]")
according to where you want to run this

On Mon, 2 Jan 2023 at 14:38, Shrikant Prasad  wrote:

> Hi,
>
> I am trying to migrate one spark application from Spark 2.3 to 3.0.1.
>
> The issue can be reproduced using below sample code:
>
> object TestMain {
>
> val session =
> SparkSession.builder().appName("test").enableHiveSupport().getOrCreate()
>
> def main(args: Array[String]): Unit = {
>
> import session.implicits._
> val a = *session.*sparkContext.parallelize(*Array*
> (("A",1),("B",2))).toDF("_c1","_c2").*rdd*.map(x=>
> x(0).toString).collect()
> *println*(a.mkString("|"))
>
> }
> }
>
> It runs successfully in Spark 2.3 but fails with Spark 3.0.1 with below
> exception:
>
> Caused by: org.apache.spark.SparkException: A master URL must be set in
> your configuration
>
> at
> org.apache.spark.SparkContext.(SparkContext.scala:394)
>
> at
> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2690)
>
> at
> org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:949)
>
> at scala.Option.getOrElse(Option.scala:189)
>
> at
> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
>
> at TestMain$.(TestMain.scala:7)
>
> at TestMain$.(TestMain.scala)
>
>
> From the exception it appears that it tries to create spark session on
> executor also in Spark 3 whereas its not created again on executor in Spark
> 2.3.
>
> Can anyone help in identfying why there is this change in behavior?
>
> Thanks and Regards,
>
> Shrikant
>
> --
> Regards,
> Shrikant Prasad
>


Re: RDD block has negative value in Spark UI

2022-12-07 Thread Stelios Philippou
Already a know minor issue

https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-10141

On Wed, 7 Dec 2022, 15:09 K B M Kaala Subhikshan, <
kbmkaalasubhiks...@gmail.com> wrote:

> Could you explain why the RDD block has a negative value?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: [pyspark delta] [delta][Spark SQL]: Getting an Analysis Exception. The associated location (path) is not empty

2022-08-02 Thread Stelios Philippou
HI Kumba.

SQL Structure is a bit different for
CREATE OR REPLACE TABLE


You can only do the following
CREATE TABLE IF NOT EXISTS



https://spark.apache.org/docs/3.3.0/sql-ref-syntax-ddl-create-table-datasource.html

On Tue, 2 Aug 2022 at 14:38, Sean Owen  wrote:

> I don't think "CREATE OR REPLACE TABLE" exists (in SQL?); this isn't a
> VIEW.
> Delete the path first; that's simplest.
>
> On Tue, Aug 2, 2022 at 12:55 AM Kumba Janga  wrote:
>
>> Thanks Sean! That was a simple fix. I changed it to "Create or Replace
>> Table" but now I am getting the following error. I am still researching
>> solutions but so far no luck.
>>
>> ParseException:
>> mismatched input '' expecting {'ADD', 'AFTER', 'ALL', 'ALTER', 
>> 'ANALYZE', 'AND', 'ANTI', 'ANY', 'ARCHIVE', 'ARRAY', 'AS', 'ASC', 'AT', 
>> 'AUTHORIZATION', 'BETWEEN', 'BOTH', 'BUCKET', 'BUCKETS', 'BY', 'CACHE', 
>> 'CASCADE', 'CASE', 'CAST', 'CHANGE', 'CHECK', 'CLEAR', 'CLUSTER', 
>> 'CLUSTERED', 'CODEGEN', 'COLLATE', 'COLLECTION', 'COLUMN', 'COLUMNS', 
>> 'COMMENT', 'COMMIT', 'COMPACT', 'COMPACTIONS', 'COMPUTE', 'CONCATENATE', 
>> 'CONSTRAINT', 'COST', 'CREATE', 'CROSS', 'CUBE', 'CURRENT', 'CURRENT_DATE', 
>> 'CURRENT_TIME', 'CURRENT_TIMESTAMP', 'CURRENT_USER', 'DATA', 'DATABASE', 
>> DATABASES, 'DBPROPERTIES', 'DEFINED', 'DELETE', 'DELIMITED', 'DESC', 
>> 'DESCRIBE', 'DFS', 'DIRECTORIES', 'DIRECTORY', 'DISTINCT', 'DISTRIBUTE', 
>> 'DIV', 'DROP', 'ELSE', 'END', 'ESCAPE', 'ESCAPED', 'EXCEPT', 'EXCHANGE', 
>> 'EXISTS', 'EXPLAIN', 'EXPORT', 'EXTENDED', 'EXTERNAL', 'EXTRACT', 'FALSE', 
>> 'FETCH', 'FIELDS', 'FILTER', 'FILEFORMAT', 'FIRST', 'FOLLOWING', 'FOR', 
>> 'FOREIGN', 'FORMAT', 'FORMATTED', 'FROM', 'FULL', 'FUNCTION', 'FUNCTIONS', 
>> 'GLOBAL', 'GRANT', 'GROUP', 'GROUPING', 'HAVING', 'IF', 'IGNORE', 'IMPORT', 
>> 'IN', 'INDEX', 'INDEXES', 'INNER', 'INPATH', 'INPUTFORMAT', 'INSERT', 
>> 'INTERSECT', 'INTERVAL', 'INTO', 'IS', 'ITEMS', 'JOIN', 'KEYS', 'LAST', 
>> 'LATERAL', 'LAZY', 'LEADING', 'LEFT', 'LIKE', 'LIMIT', 'LINES', 'LIST', 
>> 'LOAD', 'LOCAL', 'LOCATION', 'LOCK', 'LOCKS', 'LOGICAL', 'MACRO', 'MAP', 
>> 'MATCHED', 'MERGE', 'MSCK', 'NAMESPACE', 'NAMESPACES', 'NATURAL', 'NO', NOT, 
>> 'NULL', 'NULLS', 'OF', 'ON', 'ONLY', 'OPTION', 'OPTIONS', 'OR', 'ORDER', 
>> 'OUT', 'OUTER', 'OUTPUTFORMAT', 'OVER', 'OVERLAPS', 'OVERLAY', 'OVERWRITE', 
>> 'PARTITION', 'PARTITIONED', 'PARTITIONS', 'PERCENT', 'PIVOT', 'PLACING', 
>> 'POSITION', 'PRECEDING', 'PRIMARY', 'PRINCIPALS', 'PROPERTIES', 'PURGE', 
>> 'QUERY', 'RANGE', 'RECORDREADER', 'RECORDWRITER', 'RECOVER', 'REDUCE', 
>> 'REFERENCES', 'REFRESH', 'RENAME', 'REPAIR', 'REPLACE', 'RESET', 'RESTRICT', 
>> 'REVOKE', 'RIGHT', RLIKE, 'ROLE', 'ROLES', 'ROLLBACK', 'ROLLUP', 'ROW', 
>> 'ROWS', 'SCHEMA', 'SELECT', 'SEMI', 'SEPARATED', 'SERDE', 'SERDEPROPERTIES', 
>> 'SESSION_USER', 'SET', 'MINUS', 'SETS', 'SHOW', 'SKEWED', 'SOME', 'SORT', 
>> 'SORTED', 'START', 'STATISTICS', 'STORED', 'STRATIFY', 'STRUCT', 'SUBSTR', 
>> 'SUBSTRING', 'TABLE', 'TABLES', 'TABLESAMPLE', 'TBLPROPERTIES', TEMPORARY, 
>> 'TERMINATED', 'THEN', 'TO', 'TOUCH', 'TRAILING', 'TRANSACTION', 
>> 'TRANSACTIONS', 'TRANSFORM', 'TRIM', 'TRUE', 'TRUNCATE', 'TYPE', 
>> 'UNARCHIVE', 'UNBOUNDED', 'UNCACHE', 'UNION', 'UNIQUE', 'UNKNOWN', 'UNLOCK', 
>> 'UNSET', 'UPDATE', 'USE', 'USER', 'USING', 'VALUES', 'VIEW', 'VIEWS', 
>> 'WHEN', 'WHERE', 'WINDOW', 'WITH', IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 
>> 1, pos 23)
>>
>> == SQL ==
>> CREATE OR REPLACE TABLE
>>
>>
>> On Mon, Aug 1, 2022 at 8:32 PM Sean Owen  wrote:
>>
>>> Pretty much what it says? you are creating a table over a path that
>>> already has data in it. You can't do that without mode=overwrite at least,
>>> if that's what you intend.
>>>
>>> On Mon, Aug 1, 2022 at 7:29 PM Kumba Janga  wrote:
>>>


- Component: Spark Delta, Spark SQL
- Level: Beginner
- Scenario: Debug, How-to

 *Python in Jupyter:*

 import pyspark
 import pyspark.sql.functions

 from pyspark.sql import SparkSession
 spark = (
 SparkSession
 .builder
 .appName("programming")
 .master("local")
 .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
 .config("spark.sql.extensions", 
 "io.delta.sql.DeltaSparkSessionExtension")
 .config("spark.sql.catalog.spark_catalog", 
 "org.apache.spark.sql.delta.catalog.DeltaCatalog")
 .config('spark.ui.port', '4050')
 .getOrCreate()

 )
 from delta import *

 string_20210609 = '''worked_date,worker_id,delete_flag,hours_worked
 2021-06-09,1001,Y,7
 2021-06-09,1002,Y,3.75
 2021-06-09,1003,Y,7.5
 2021-06-09,1004,Y,6.25'''

 rdd_20210609 = spark.sparkContext.parallelize(string_20210609.split('\n'))

 # FILES WILL SHOW UP ON THE LEFT UNDER THE FOLDER ICON IF YOU WANT TO 
 BROWSE THEM
 OUTPUT_DELTA_PATH = './output/delta/'

 spark.sql('CREATE DATABASE IF 

Re: how to properly filter a dataset by dates ?

2022-06-17 Thread Stelios Philippou
dataset.where(to_date(dataset.col("Date"),"MM-dd-").geq(to_date("02-03-2012",
"MM-dd-"));

On Fri, 17 Jun 2022, 22:51 marc nicole,  wrote:

> dataset =
> dataset.where(to_date(dataset.col("Date"),"MM-dd-").geq("02-03-2012").cast("date"));
> ?
> This is returning an empty dataset.
>
> Le ven. 17 juin 2022 à 21:34, Stelios Philippou  a
> écrit :
>
>> You are already doing it once.
>> to_date the second part and don't forget to cast it as well
>>
>> On Fri, 17 Jun 2022, 22:08 marc nicole,  wrote:
>>
>>> should i cast to date the target date then? for example maybe:
>>>
>>> dataset =
>>>> dataset.where(to_date(dataset.col("Date"),"MM-dd-").geq("02-03-2012").cast("date"));
>>>> ?
>>>
>>> How to to do that ? comparing with dates?
>>>
>>>
>>> Le ven. 17 juin 2022 à 20:52, Sean Owen  a écrit :
>>>
>>>> Look at your query again. You are comparing dates to strings. The dates
>>>> widen back to strings.
>>>>
>>>> On Fri, Jun 17, 2022, 1:39 PM marc nicole  wrote:
>>>>
>>>>> I also tried:
>>>>>
>>>>> dataset =
>>>>>> dataset.where(to_date(dataset.col("Date"),"MM-dd-").geq("02-03-2012"));
>>>>>
>>>>>
>>>>> But it returned an empty dataset.
>>>>>
>>>>> Le ven. 17 juin 2022 à 20:28, Sean Owen  a écrit :
>>>>>
>>>>>> Same answer as last time - those are strings, not dates. 02-02-2015
>>>>>> as a string is before 02-03-2012.
>>>>>> You apply date function to dates, not strings.
>>>>>> You have to parse the dates properly, which was the problem in your
>>>>>> last email.
>>>>>>
>>>>>> On Fri, Jun 17, 2022 at 12:58 PM marc nicole 
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I have a dataset containing a column of dates, which I want to use
>>>>>>> for filtering. Nothing, from what I have tried, seems to return the 
>>>>>>> exact
>>>>>>> right solution.
>>>>>>> Here's my input:
>>>>>>>
>>>>>>> +   +
>>>>>>> |Date|
>>>>>>> ++
>>>>>>> | 02-08-2019 |
>>>>>>> ++
>>>>>>> | 02-07-2019 |
>>>>>>> ++
>>>>>>> | 12-01-2019 |
>>>>>>> ++
>>>>>>> | 02-02-2015 |
>>>>>>> ++
>>>>>>> | 02-03-2012 |
>>>>>>> ++
>>>>>>> | 05-06-2018 |
>>>>>>> ++
>>>>>>> | 02-08-2022 |
>>>>>>> ++
>>>>>>>
>>>>>>> The code that i have tried (always giving missing dates in the
>>>>>>> result):
>>>>>>>
>>>>>>> dataset = dataset.filter( dataset.col("Date").geq("02-03-2012"));
>>>>>>>> // not showing the date of *02-02-2015*
>>>>>>>
>>>>>>>
>>>>>>> I tried to apply *date_trunc()* with the first parameter "day" but
>>>>>>> nothing.
>>>>>>>
>>>>>>> I have also compared a converted column (using *to_date()*) with a
>>>>>>> *literal *of the target date but always returning an empty dataset.
>>>>>>>
>>>>>>> How to do that in Java ?
>>>>>>>
>>>>>>>


Re: how to properly filter a dataset by dates ?

2022-06-17 Thread Stelios Philippou
You are already doing it once.
to_date the second part and don't forget to cast it as well

On Fri, 17 Jun 2022, 22:08 marc nicole,  wrote:

> should i cast to date the target date then? for example maybe:
>
> dataset =
>> dataset.where(to_date(dataset.col("Date"),"MM-dd-").geq("02-03-2012").cast("date"));
>> ?
>
> How to to do that ? comparing with dates?
>
>
> Le ven. 17 juin 2022 à 20:52, Sean Owen  a écrit :
>
>> Look at your query again. You are comparing dates to strings. The dates
>> widen back to strings.
>>
>> On Fri, Jun 17, 2022, 1:39 PM marc nicole  wrote:
>>
>>> I also tried:
>>>
>>> dataset =
 dataset.where(to_date(dataset.col("Date"),"MM-dd-").geq("02-03-2012"));
>>>
>>>
>>> But it returned an empty dataset.
>>>
>>> Le ven. 17 juin 2022 à 20:28, Sean Owen  a écrit :
>>>
 Same answer as last time - those are strings, not dates. 02-02-2015 as
 a string is before 02-03-2012.
 You apply date function to dates, not strings.
 You have to parse the dates properly, which was the problem in your
 last email.

 On Fri, Jun 17, 2022 at 12:58 PM marc nicole 
 wrote:

> Hello,
>
> I have a dataset containing a column of dates, which I want to use for
> filtering. Nothing, from what I have tried, seems to return the exact 
> right
> solution.
> Here's my input:
>
> +   +
> |Date|
> ++
> | 02-08-2019 |
> ++
> | 02-07-2019 |
> ++
> | 12-01-2019 |
> ++
> | 02-02-2015 |
> ++
> | 02-03-2012 |
> ++
> | 05-06-2018 |
> ++
> | 02-08-2022 |
> ++
>
> The code that i have tried (always giving missing dates in the result):
>
> dataset = dataset.filter( dataset.col("Date").geq("02-03-2012"));  //
>> not showing the date of *02-02-2015*
>
>
> I tried to apply *date_trunc()* with the first parameter "day" but
> nothing.
>
> I have also compared a converted column (using *to_date()*) with a
> *literal *of the target date but always returning an empty dataset.
>
> How to do that in Java ?
>
>


Re: API Problem

2022-06-10 Thread Stelios Philippou
Sid
Then the issue is on the data in the way you are creating them for that
specific column.

call_to_cust_bulk_api(policyUrl,to_json(struct(*colsListToBePassed)))

Perhaps wrap that in a

lit(call_to_cust_bulk_api(policyUrl,to_json(struct(*colsListToBePassed

else you will need to start sending simpler data there to make sure
that the API works


On Fri, 10 Jun 2022 at 12:15, Sid  wrote:

> Still,  it is giving the same error.
>
> On Fri, Jun 10, 2022 at 5:13 AM Sean Owen  wrote:
>
>> That repartition seems to do nothing? But yes the key point is use col()
>>
>> On Thu, Jun 9, 2022, 9:41 PM Stelios Philippou 
>> wrote:
>>
>>> Perhaps
>>>
>>>
>>> finalDF.repartition(finalDF.rdd.getNumPartitions()).withColumn("status_for_batch
>>>
>>> To
>>>
>>> finalDF.repartition(finalDF.rdd.getNumPartitions()).withColumn(col("status_for_batch")
>>>
>>>
>>>
>>>
>>> On Thu, 9 Jun 2022, 22:32 Sid,  wrote:
>>>
>>>> Hi Experts,
>>>>
>>>> I am facing one problem while passing a column to the method.  The
>>>> problem is described in detail here:
>>>>
>>>>
>>>> https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark
>>>>
>>>> TIA,
>>>> Sid
>>>>
>>>


Re: API Problem

2022-06-09 Thread Stelios Philippou
Perhaps

finalDF.repartition(finalDF.rdd.getNumPartitions()).withColumn("status_for_batch

To

finalDF.repartition(finalDF.rdd.getNumPartitions()).withColumn(col("status_for_batch")




On Thu, 9 Jun 2022, 22:32 Sid,  wrote:

> Hi Experts,
>
> I am facing one problem while passing a column to the method.  The problem
> is described in detail here:
>
>
> https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark
>
> TIA,
> Sid
>


Re: How to convert a Dataset to a Dataset?

2022-06-06 Thread Stelios Philippou
Hi All,

Simple in Java as well.
You can get the Dataset Directly

Dataset encodedString = df.select("Column")
.where("")
.as(Encoders.STRING())
.toDF();

On Mon, 6 Jun 2022 at 15:26, Christophe Préaud <
christophe.pre...@kelkoogroup.com> wrote:

> Hi Marc,
>
> I'm not much familiar with Spark on Java, but according to the doc
> ,
> it should be:
> Encoder stringEncoder = Encoders.STRING();
> dataset.as(stringEncoder);
>
> For the record, it is much simpler in Scala:
> dataset.as[String]
>
> Of course, this will work if your DataFrame only contains one column of
> type String, e.g.:
> val df = spark.read.parquet("Cyrano_de_Bergerac_Acte_V.parquet")
> df.printSchema
>
> root
>  |-- line: string (nullable = true)
>
> df.as[String]
>
> Otherwise, you will have to convert somehow the Row to a String, e.g. in
> Scala:
> case class Data(f1: String, f2: Int, f3: Long)
> val df = Seq(Data("a", 1, 1L), Data("b", 2, 2L), Data("c", 3, 3L),
> Data("d", 4, 4L), Data("e", 5, 5L)).toDF
> val ds = df.map(_.mkString(",")).as[String]
> ds.show
>
> +-+
> |value|
> +-+
> |a,1,1|
> |b,2,2|
> |c,3,3|
> |d,4,4|
> |e,5,5|
> +-+
>
> Regards,
> Christophe.
>
> On 6/4/22 14:38, marc nicole wrote:
>
> Hi,
> How to convert a Dataset to a Dataset?
> What i have tried is:
>
> List list = dataset.as(Encoders.STRING()).collectAsList();
> Dataset datasetSt = spark.createDataset(list, Encoders.STRING());
> // But this line raises a org.apache.spark.sql.AnalysisException: Try to
> map struct... to Tuple1, but failed as the number of fields does not line
> up
>
> Type of columns being String
> How to solve this?
>
>
>


Re: Unable to format timestamp values in pyspark

2022-05-30 Thread Stelios Philippou
Sid,

According to the error that i am seeing there, this is the Date Format
issue.

Text '5/1/2019 1:02:16' could not be parsed


But your time format is specific as such

'M/dd/ H:mm:ss')

You can see that the day specific is /1/ but your format is dd which
expects two digits.

Please try the following format and let us know

'M/d/ H:mm:ss'





On Mon, 30 May 2022 at 11:05, Sid  wrote:

> Hi Team,
>
> I am able to convert to timestamp. However, when I try to filter out the
> records based on a specific value it gives an error as mentioned in the
> post. Could you please help me with this?
>
>
> https://stackoverflow.com/questions/72422897/unable-to-format-timestamp-in-pyspark/72423394#72423394
>
>
> Best Regards,
> Sid
>


Re: Unable to convert double values

2022-05-29 Thread Stelios Philippou
Hi Sid,

df = df.withColumn("annual_salary",
regexp_replace(col("annual_salary"), "\.", ""))

 The value 125.06 becomes 12506 which when cast to double is 12506.00


Have you tried without removing the . ?

df.withColumn("annual_salary",
round(col("annual_salary").cast("double"), 2)).show(truncate=False)




On Sun, 29 May 2022 at 20:25, Sid  wrote:

> Hi Team,
>
> I need help with the below problem:
>
>
> https://stackoverflow.com/questions/72422872/unable-to-format-double-values-in-pyspark?noredirect=1#comment127940175_72422872
>
>
> What am I doing wrong?
>
> Thanks,
> Siddhesh
>


Re: protobuf data as input to spark streaming

2022-04-06 Thread Stelios Philippou
Yes we are currently using it as such.
Code is in java. Will that work?

On Wed, 6 Apr 2022 at 00:51, Kiran Biswal  wrote:

> Hello Experts
>
> Has anyone used protobuf (proto3) encoded data (from kafka) as input
> source and been able to do spark structured streaming?
>
> I would appreciate if you can share any sample code/example
>
> Regards
> Kiran
>
>>


Re: add an auto_increment column

2022-02-07 Thread Stelios Philippou
https://stackoverflow.com/a/51854022/299676

On Tue, 8 Feb 2022 at 09:25, Stelios Philippou  wrote:

> This has the information that you require in order to add an extra column
> with a sequence to it.
>
>
> On Tue, 8 Feb 2022 at 09:11,  wrote:
>
>>
>> Hello Gourav
>>
>>
>> As you see here orderBy has already give the solution for "equal
>> amount":
>>
>> >>> df =
>> >>>
>> sc.parallelize([("orange",2),("apple",3),("tomato",3),("cherry",5)]).toDF(['fruit','amount'])
>>
>> >>> df.select("*").orderBy("amount",ascending=False).show()
>> +--+--+
>> | fruit|amount|
>> +--+--+
>> |cherry| 5|
>> | apple| 3|
>> |tomato| 3|
>> |orange| 2|
>> +--+--+
>>
>>
>> I want to add a column at the right whose name is "top" and the value
>> auto_increment from 1 to N.
>>
>> Thank you.
>>
>>
>>
>> On 08/02/2022 13:52, Gourav Sengupta wrote:
>> > Hi,
>> >
>> > sorry once again, will try to understand the problem first :)
>> >
>> > As we can clearly see that the initial responses were incorrectly
>> > guessing the solution to be monotonically_increasing function
>> >
>> > What if there are two fruits with equal amount? For any real life
>> > application, can we understand what are trying to achieve by the
>> > rankings?
>> >
>> > Regards,
>> > Gourav Sengupta
>> >
>> > On Tue, Feb 8, 2022 at 4:22 AM ayan guha  wrote:
>> >
>> >> For this req you can rank or dense rank.
>> >>
>> >> On Tue, 8 Feb 2022 at 1:12 pm,  wrote:
>> >>
>> >>> Hello,
>> >>>
>> >>> For this query:
>> >>>
>> >>>>>> df.select("*").orderBy("amount",ascending=False).show()
>> >>> +--+--+
>> >>> | fruit|amount|
>> >>> +--+--+
>> >>> |tomato| 9|
>> >>> | apple| 6|
>> >>> |cherry| 5|
>> >>> |orange| 3|
>> >>> +--+--+
>> >>>
>> >>> I want to add a column "top", in which the value is: 1,2,3...
>> >>> meaning
>> >>> top1, top2, top3...
>> >>>
>> >>> How can I do it?
>> >>>
>> >>> Thanks.
>> >>>
>> >>> On 07/02/2022 21:18, Gourav Sengupta wrote:
>> >>>> Hi,
>> >>>>
>> >>>> can we understand the requirement first?
>> >>>>
>> >>>> What is that you are trying to achieve by auto increment id? Do
>> >>> you
>> >>>> just want different ID's for rows, or you may want to keep track
>> >>> of
>> >>>> the record count of a table as well, or do you want to do use
>> >>> them for
>> >>>> surrogate keys?
>> >>>>
>> >>>> If you are going to insert records multiple times in a table,
>> >>> and
>> >>>> still have different values?
>> >>>>
>> >>>> I think without knowing the requirements all the above
>> >>> responses, like
>> >>>> everything else where solutions are reached before understanding
>> >>> the
>> >>>> problem, has high chances of being wrong.
>> >>>>
>> >>>> Regards,
>> >>>> Gourav Sengupta
>> >>>>
>> >>>> On Mon, Feb 7, 2022 at 2:21 AM Siva Samraj
>> >>> 
>> >>>> wrote:
>> >>>>
>> >>>>> Monotonically_increasing_id() will give the same functionality
>> >>>>>
>> >>>>> On Mon, 7 Feb, 2022, 6:57 am ,  wrote:
>> >>>>>
>> >>>>>> For a dataframe object, how to add a column who is
>> >>> auto_increment
>> >>>>>> like
>> >>>>>> mysql's behavior?
>> >>>>>>
>> >>>>>> Thank you.
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> > -
>> >>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >>>
>> >>>
>> >>
>> > -
>> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >> --
>> >> Best Regards,
>> >> Ayan Guha
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: add an auto_increment column

2022-02-07 Thread Stelios Philippou
This has the information that you require in order to add an extra column
with a sequence to it.


On Tue, 8 Feb 2022 at 09:11,  wrote:

>
> Hello Gourav
>
>
> As you see here orderBy has already give the solution for "equal
> amount":
>
> >>> df =
> >>>
> sc.parallelize([("orange",2),("apple",3),("tomato",3),("cherry",5)]).toDF(['fruit','amount'])
>
> >>> df.select("*").orderBy("amount",ascending=False).show()
> +--+--+
> | fruit|amount|
> +--+--+
> |cherry| 5|
> | apple| 3|
> |tomato| 3|
> |orange| 2|
> +--+--+
>
>
> I want to add a column at the right whose name is "top" and the value
> auto_increment from 1 to N.
>
> Thank you.
>
>
>
> On 08/02/2022 13:52, Gourav Sengupta wrote:
> > Hi,
> >
> > sorry once again, will try to understand the problem first :)
> >
> > As we can clearly see that the initial responses were incorrectly
> > guessing the solution to be monotonically_increasing function
> >
> > What if there are two fruits with equal amount? For any real life
> > application, can we understand what are trying to achieve by the
> > rankings?
> >
> > Regards,
> > Gourav Sengupta
> >
> > On Tue, Feb 8, 2022 at 4:22 AM ayan guha  wrote:
> >
> >> For this req you can rank or dense rank.
> >>
> >> On Tue, 8 Feb 2022 at 1:12 pm,  wrote:
> >>
> >>> Hello,
> >>>
> >>> For this query:
> >>>
> >> df.select("*").orderBy("amount",ascending=False).show()
> >>> +--+--+
> >>> | fruit|amount|
> >>> +--+--+
> >>> |tomato| 9|
> >>> | apple| 6|
> >>> |cherry| 5|
> >>> |orange| 3|
> >>> +--+--+
> >>>
> >>> I want to add a column "top", in which the value is: 1,2,3...
> >>> meaning
> >>> top1, top2, top3...
> >>>
> >>> How can I do it?
> >>>
> >>> Thanks.
> >>>
> >>> On 07/02/2022 21:18, Gourav Sengupta wrote:
>  Hi,
> 
>  can we understand the requirement first?
> 
>  What is that you are trying to achieve by auto increment id? Do
> >>> you
>  just want different ID's for rows, or you may want to keep track
> >>> of
>  the record count of a table as well, or do you want to do use
> >>> them for
>  surrogate keys?
> 
>  If you are going to insert records multiple times in a table,
> >>> and
>  still have different values?
> 
>  I think without knowing the requirements all the above
> >>> responses, like
>  everything else where solutions are reached before understanding
> >>> the
>  problem, has high chances of being wrong.
> 
>  Regards,
>  Gourav Sengupta
> 
>  On Mon, Feb 7, 2022 at 2:21 AM Siva Samraj
> >>> 
>  wrote:
> 
> > Monotonically_increasing_id() will give the same functionality
> >
> > On Mon, 7 Feb, 2022, 6:57 am ,  wrote:
> >
> >> For a dataframe object, how to add a column who is
> >>> auto_increment
> >> like
> >> mysql's behavior?
> >>
> >> Thank you.
> >>
> >>
> >
> 
> >>>
> >>
> > -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>>
> >>>
> >>
> > -
> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> --
> >> Best Regards,
> >> Ayan Guha
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Unable to use WriteStream to write to delta file.

2021-12-17 Thread Stelios Philippou
Hi Abhinav,

Using ReadStream or Read will not mind.

The following error
java.lang.NoSuchMethodError:
org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldNames(

states that you are using different version of Spark at someplace of your
project or you are using an older component

Please check your Spark Binaries and as well as your pom that you are
indeed using the same versions.

On Fri, 17 Dec 2021 at 15:11, Abhinav Gundapaneni
 wrote:

> Hello Spark community,
>
>
>
> I’m using Apache spark(version 3.2) to read a CSV file to a dataframe
> using ReadStream, process the dataframe and write the dataframe to Delta
> file using WriteStream. I’m getting a failure during the WriteStream
> process. I’m trying to run the script locally in my windows 11 machine.
> Below is the stack trace of the error I’m facing. Please let me know if
> there’s anything that I’m missing.
>
>
>
>
>
>
>
>
>
>
>
> java.lang.NoSuchMethodError:
> org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldNames(Lscala/collection/Seq;)V
>
> at
> org.apache.spark.sql.delta.schema.SchemaUtils$.checkFieldNames(SchemaUtils.scala:958)
>
>
> at
> org.apache.spark.sql.delta.OptimisticTransactionImpl.verifyNewMetadata(OptimisticTransaction.scala:216)
>
> at
> org.apache.spark.sql.delta.OptimisticTransactionImpl.verifyNewMetadata$(OptimisticTransaction.scala:214)
>
> at
> org.apache.spark.sql.delta.OptimisticTransaction.verifyNewMetadata(OptimisticTransaction.scala:80)
>
> at
> org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadata(OptimisticTransaction.scala:208)
>
> at
> org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadata$(OptimisticTransaction.scala:195)
>
> at
> org.apache.spark.sql.delta.OptimisticTransaction.updateMetadata(OptimisticTransaction.scala:80)
>
> at
> org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata(ImplicitMetadataOperation.scala:101)
>
> at
> org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata$(ImplicitMetadataOperation.scala:62)
>
> at
> org.apache.spark.sql.delta.sources.DeltaSink.updateMetadata(DeltaSink.scala:37)
>
>
> at
> org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata(ImplicitMetadataOperation.scala:59)
>
> at
> org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata$(ImplicitMetadataOperation.scala:50)
>
> at
> org.apache.spark.sql.delta.sources.DeltaSink.updateMetadata(DeltaSink.scala:37)
>
>
> at
> org.apache.spark.sql.delta.sources.DeltaSink.$anonfun$addBatch$1(DeltaSink.scala:80)
>
>
> at
> org.apache.spark.sql.delta.sources.DeltaSink.$anonfun$addBatch$1$adapted(DeltaSink.scala:54)
>
> at
> org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
>
> at
> org.apache.spark.sql.delta.sources.DeltaSink.addBatch(DeltaSink.scala:54)
>
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
>
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>
> at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
>
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
>
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
>
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
>
>   

Re: Running spark Kafka streaming jo in Azure HDInsight

2021-10-06 Thread Stelios Philippou
Hi Favas,

The error states that you are using different libraries version.

Exception in thread "streaming-start" java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V



Have in mind that Spark uses its internal libraries for the majority of
this. So those two must be aligned between Spark and your code.

Can you verify that your HDVersion is indeed 3.0 ?

Also how are you submitting the job ?


On Wed, 6 Oct 2021 at 14:10, Muhammed Favas <
favas.muham...@expeedsoftware.com> wrote:

> Hi,
>
>
>
> I am facing some dependency issue in running a spark streaming job in
> Azure HDInsight. The job is connecting to a kafka broker which is hosted in
> a LAN and has public IP access to it.
>
>
>
> Spark job porn.xml set up – spark version 3.0.0, Scala version 2.12
>
>
>
> 
>   org.scala-lang
>   scala-library
>   2.12.12
> 
> 
>   org.apache.spark
>   spark-core_2.12
>   3.0.0
> 
>  
>   org.apache.spark
>   spark-sql_2.12
>   3.0.0
>   
> 
> 
>   org.apache.hadoop
>   hadoop-common
>   2.7.4
> 
> 
>   org.apache.spark
>   spark-streaming_2.12
>   3.0.0
>   
> 
> 
>   org.apache.spark
>   spark-streaming-kafka-0-10_2.12
>   3.0.0
> 
>
>
>
> HDInsight version - Spark 3.0 (HDI 4.0)
>
> I am using Livy API to start job in azure remotely. Below is the list of
> files passed in “jars” option in livy
>
>
>
> kafka-clients-2.7.0.jar
> 
> ,
>
> spark-streaming-kafka-0-10_2.12-3.0.0.jar
> 
> ,
>
> spark-token-provider-kafka-0-10_2.12-3.0.0.jar
> 
>
>
>
> The job is starting in azure spark cluster, but it is not receiving data
> from my kafka broker. Here is the error I am getting
>
>
>
> Exception in thread "streaming-start" java.lang.NoSuchMethodError: 
> org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
>
> at 
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:93)
>
> at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)
>
> at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:258)
>
> at 
> org.apache.spark.streaming.DStreamGraph.$anonfun$start$7(DStreamGraph.scala:55)
>
> at 
> org.apache.spark.streaming.DStreamGraph.$anonfun$start$7$adapted(DStreamGraph.scala:55)
>
> at scala.collection.Iterator.foreach(Iterator.scala:941)
>
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>
> at 
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:974)
>
> at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)
>
> at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>
> at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)
>
> at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)
>
> at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)
>
> at 
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:971)
>
> at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)
>
> at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)
>
> at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)
>
> at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
>
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>
> at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>
> at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
>
>
>
> Here is the scala code which used to connect to broker.
>
>
>
> import org.apache.spark.streaming.kafka010.KafkaUtils
> import org.apache.spark.streaming.kafka010.LocationStrategies.
> *PreferConsistent*import 
> org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
>
>
>
> val kafkaParams = *Map*[String, Object](
> "bootstrap.servers" -> kafkaServer,
> "key.deserializer" -> *classOf*[StringDeserializer],
> "value.deserializer" -> *classOf*[StringDeserializer],
> "group.id" -> connectionID,
> "auto.offset.reset" -> "earliest",
> "enable.auto.commit" -> (true: java.lang.Boolean),
> "partition.assignment.strategy" 
> ->"org.apache.kafka.clients.consumer.RangeAssignor"
>   )
>
>   val topics = *Array*(connectionID)
>  

Re: 21/09/27 23:34:03 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

2021-09-27 Thread Stelios Philippou
It might be possible that you do not have the resources on the cluster. So
your job will remain to wait for them as they cannot be provided.

On Tue, 28 Sep 2021, 04:26 davvy benny,  wrote:

> How can I solve the problem?
>
> On 2021/09/27 23:05:41, Thejdeep G  wrote:
> > Hi,
> >
> > That would usually mean that the application has not been allocated the
> executor resources from the resource manager yet.
> >
> > On 2021/09/27 21:37:30, davvy benny  wrote:
> > > Hi
> > > I am trying to run spark programmatically from eclipse with these
> configurations for hadoop cluster locally
> > > SparkConf sparkConf = new
> SparkConf().setAppName("simpleTest2").setMaster("yarn")
> > > .set("spark.executor.memory", "1g")
> > > .set("deploy.mode", "cluster")
> > > .set("spark.yarn.stagingDir",
> "hdfs://localhost:9000/user/hadoop/")
> > > .set("spark.shuffle.service.enabled", "false")
> > > .set("spark.dynamicAllocation.enabled", "false")
> > > .set("spark.cores.max", "1")
> > > .set("spark.executor.instances","2")
> > > .set("spark.executor.memory","500m") //
> > > .set("spark.executor.cores","1")//
> > >
>  .set("spark.yarn.nodemanager.resource.cpu-vcores","4")
> > > .set("spark.yarn.submit.file.replication",
> "1")
> > > .set("spark.yarn.jars",
> "hdfs://localhost:9000/user/hadoop/davben/jars/*.jar")
> > >
> > > When I check on the http://localhost:8088/cluster/apps/RUNNING I can
> see that my job is submitted but y terminal loops saying
> > > 21/09/27 23:36:33 WARN YarnScheduler: Initial job has not accepted any
> resources; check your cluster UI to ensure that workers are registered and
> have sufficient resources
> > >
> > > I ve noticed that this occurs after the application of a map on my
> Dataset.
> > >
> > > -
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
> > >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

2021-09-06 Thread Stelios Philippou
Stelios Philippou
16:20 (3 minutes ago)
to Mich
My local Spark submit :
 ~/development/SimpleKafkaStream  spark-submit --version



Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
  /_/

Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.10
Branch HEAD
Compiled by user centos on 2021-05-24T04:27:48Z


On K8  i have one for j8 and one for j11
The K8 Docker env :

/opt/spark/bin/spark-submit --version
Welcome to
 __
/ __/__ ___ _/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.1
/_/

Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_302

The k8 J11 Env :
:/opt/spark/work-dir$ /opt/spark/bin/spark-submit --version
Welcome to
 __
/ __/__ ___ _/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.1
/_/

Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.11


Will downgrade now to check for 3.1.1 as you mentioned. But as this is a
minor version i dont believe that there should be any issues there.

On Mon, 6 Sept 2021 at 16:12, Mich Talebzadeh 
wrote:

>
>1. which version of Spark the docker is built for
>2. Which version of spark-submit you are using to submit this job
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 6 Sept 2021 at 14:07, Stelios Philippou 
> wrote:
>
>> Yes on Local mode both from intelli and using spark-submit on my machine
>> and on a windows machine work.
>>
>> I have noticed the following error when adding this in the above
>> spark-submit for k8
>>
>> --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.2 \
>>
>>
>> :: resolving dependencies ::
>> org.apache.spark#spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c;1.0
>>
>> confs: [default]
>>
>> Exception in thread "main" java.io.FileNotFoundException:
>> /opt/spark/.ivy2/cache/resolved-org.apache.spark-spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c-1.0.xml
>> (No such file or directory)
>>
>>
>>
>> is there some way to verify that the k8 installation is correct ?
>>
>> Other spark processes that do not have streaming involved do work
>> correctly.
>>
>> On Mon, 6 Sept 2021 at 16:03, Mich Talebzadeh 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>>
>>> Have you tried this on local mode as opposed to Kubernetes to see if it
>>> works?
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 6 Sept 2021 at 11:16, Stelios Philippou 
>>> wrote:
>>>
>>>> Hello Jacek,
>>>>
>>>> Yes this is a spark-streaming.
>>>>  I have removed all code and created a new project with just the base
>>>> code that is enough to open a stream and loop over it to see what i am
>>>> doing wrong.
>>>>
>>>> Not adding the packages would result me in the following error
>>>>
>>>> 21/09/06 08:10:41 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
>>>> task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1):
>>>> java.lang.ClassNotFoundException:
>>>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>>>
>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>
>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>>>
>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>>>
>>>> at java.lang.Class.forName0(Native Method)
>>>>
>>>> at java.lang.Class.forName(Class.java:348)
>>>>
>>>> at

Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

2021-09-06 Thread Stelios Philippou
Yes on Local mode both from intelli and using spark-submit on my machine
and on a windows machine work.

I have noticed the following error when adding this in the above
spark-submit for k8

--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.2 \


:: resolving dependencies ::
org.apache.spark#spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c;1.0

confs: [default]

Exception in thread "main" java.io.FileNotFoundException:
/opt/spark/.ivy2/cache/resolved-org.apache.spark-spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c-1.0.xml
(No such file or directory)



is there some way to verify that the k8 installation is correct ?

Other spark processes that do not have streaming involved do work
correctly.

On Mon, 6 Sept 2021 at 16:03, Mich Talebzadeh 
wrote:

>
> Hi,
>
>
> Have you tried this on local mode as opposed to Kubernetes to see if it
> works?
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 6 Sept 2021 at 11:16, Stelios Philippou 
> wrote:
>
>> Hello Jacek,
>>
>> Yes this is a spark-streaming.
>>  I have removed all code and created a new project with just the base
>> code that is enough to open a stream and loop over it to see what i am
>> doing wrong.
>>
>> Not adding the packages would result me in the following error
>>
>> 21/09/06 08:10:41 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
>> task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1):
>> java.lang.ClassNotFoundException:
>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>
>> at java.lang.Class.forName0(Native Method)
>>
>> at java.lang.Class.forName(Class.java:348)
>>
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>>
>>
>> Which should not really be the case cause this should be included in the
>> kubernetes pod. Anyway I can confirm this ?
>>
>>
>> So my simple class is as follow :
>>
>>
>> streamingContext = new JavaStreamingContext(javaSparkContext, 
>> Durations.seconds(5));
>>
>> stream = KafkaUtils.createDirectStream(streamingContext, 
>> LocationStrategies.PreferConsistent(),
>>ConsumerStrategies.Subscribe(topics, kafkaConfiguration));
>>
>> stream.foreachRDD((VoidFunction>>) 
>> rdd -> {
>>try {
>>   rdd.foreachPartition(partition -> {
>>  while (partition.hasNext()) {
>> ConsumerRecord consumerRecord = partition.next();
>> LOGGER.info("WORKING " + consumerRecord.topic() 
>> +consumerRecord.partition() + ": "+consumerRecord.offset());
>>  }
>>   });
>>} catch (Exception e) {
>>   e.printStackTrace();
>>}
>> });
>>
>> streamingContext.start();
>> try {
>>streamingContext.awaitTermination();
>> } catch (InterruptedException e) {
>>e.printStackTrace();
>> } finally {
>>streamingContext.stop();
>>javaSparkContext.stop();
>> }
>>
>>
>> This is all there is too the class which is a java boot @Component.
>>
>> Now in order my pom is as such
>>
>> 
>> http://maven.apache.org/POM/4.0.0;
>>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>>   4.0.0
>>
>>   com.kafka
>>   SimpleKafkaStream
>>   1.0
>>
>>   jar
>>
>>   
>> UTF-8
>> 
>> UTF-8
>> 8
>> 8
>> com.kafka.Main
>>   
>>
>>   
>> org.springframework.boot
>> spring-boot-starter-parent
>> 2.4.2
>> 
>>   
>>
>>   
>> 
>>   org.springframework.boot
>>   spring-boot-starter
>>   
>> 
>>   org.springframework.boot
>>   spring-boot

Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

2021-09-06 Thread Stelios Philippou
; "The Internals Of" Online Books <https://books.japila.pl/>
> Follow me on https://twitter.com/jaceklaskowski
>
> <https://twitter.com/jaceklaskowski>
>
>
> On Tue, Aug 31, 2021 at 1:00 PM Stelios Philippou 
> wrote:
>
>> Yes you are right.
>> I am using Spring Boot for this.
>>
>> The same does work for the event that does not involve any kafka events.
>> But again i am not sending out extra jars there so nothing is replaced and
>> we are using the default ones.
>>
>> If i do not use the userClassPathFirst which will force the service to
>> use the newer version i will end up with the same problem
>>
>> We are using protobuf v3+ and as such we need to push that version since
>> apache core uses an older version.
>>
>> So all we should really need is the following : --jars
>> "protobuf-java-3.17.3.jar" \
>> and here we need the userClassPathFirst=true in order to use the latest
>> version.
>>
>>
>> Using only this jar as it works on local or no jars defined we ended up
>> with the following error.
>>
>> 21/08/31 10:53:40 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
>> task 0.0 in stage 18.0 (TID 139) (10.60.63.56 executor 1):
>> java.lang.ClassNotFoundException:
>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>
>> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>>
>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>
>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>
>> at java.base/java.lang.Class.forName0(Native Method)
>>
>> at java.base/java.lang.Class.forName(Unknown Source)
>>
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>>
>>
>>
>>
>> Which can be resolved with passing more jars.
>>
>>
>> Any idea about this error ?
>>
>> K8 does not seem to like this, but Java Spring should be the one that is
>> responsible for the version but it seems K8 does not like this versions.
>>
>> Perhaps miss configuration on K8 ?
>>
>> I haven't set that up so i am not aware of what was done there.
>>
>>
>>
>> For downgrading to java 8 on my K8 might not be so easy. I want to
>> explore if there is something else before doing that as we will need to
>> spin off new instances of K8 to check that.
>>
>>
>>
>> Thank you for the time taken
>>
>>
>>
>>
>> On Tue, 31 Aug 2021 at 12:26, Jacek Laskowski  wrote:
>>
>>> Hi Stelios,
>>>
>>> I've never seen this error before, but a couple of things caught
>>> my attention that I would look at closer to chase the root cause of the
>>> issue.
>>>
>>> "org.springframework.context.annotation.AnnotationConfigApplicationContext:"
>>> and "21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>> Application run failed" seem to indicate that you're using Spring Boot
>>> (that I know almost nothing about so take the following with a pinch of
>>> salt :))
>>>
>>> Spring Boot manages the classpath by itself and together with another
>>> interesting option in your
>>> spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how
>>> much this exception:
>>>
>>> > org.apache.spark.scheduler.ExternalClusterManager:
>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>> subtype
>>>
>>> could be due to casting compatible types from two different classloaders?
>>>
>>> Just a thought but wanted to share as I think it's worth investigating.
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://about.me/JacekLaskowski
>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>> Follow me on https://twitter.com/jaceklaskowski
>>>
>>> <https://twitter.com/jaceklaskowski>
>>>
>>>
>>> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou 
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have been facing the current issue for some time now and I was
>>>> wondering if someone might have some inside on how I can resolve the
>>>> following.
>>>>
>>>> The code (java 11) is working correctly on my local machine but
>>>> whenever I try to launch the following on K8 I am getting the follo

Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

2021-08-31 Thread Stelios Philippou
Yes you are right.
I am using Spring Boot for this.

The same does work for the event that does not involve any kafka events.
But again i am not sending out extra jars there so nothing is replaced and
we are using the default ones.

If i do not use the userClassPathFirst which will force the service to use
the newer version i will end up with the same problem

We are using protobuf v3+ and as such we need to push that version since
apache core uses an older version.

So all we should really need is the following : --jars
"protobuf-java-3.17.3.jar" \
and here we need the userClassPathFirst=true in order to use the latest
version.


Using only this jar as it works on local or no jars defined we ended up
with the following error.

21/08/31 10:53:40 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
task 0.0 in stage 18.0 (TID 139) (10.60.63.56 executor 1):
java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka010.KafkaRDDPartition

at java.base/java.net.URLClassLoader.findClass(Unknown Source)

at java.base/java.lang.ClassLoader.loadClass(Unknown Source)

at java.base/java.lang.ClassLoader.loadClass(Unknown Source)

at java.base/java.lang.Class.forName0(Native Method)

at java.base/java.lang.Class.forName(Unknown Source)

at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)




Which can be resolved with passing more jars.


Any idea about this error ?

K8 does not seem to like this, but Java Spring should be the one that is
responsible for the version but it seems K8 does not like this versions.

Perhaps miss configuration on K8 ?

I haven't set that up so i am not aware of what was done there.



For downgrading to java 8 on my K8 might not be so easy. I want to explore
if there is something else before doing that as we will need to spin off
new instances of K8 to check that.



Thank you for the time taken




On Tue, 31 Aug 2021 at 12:26, Jacek Laskowski  wrote:

> Hi Stelios,
>
> I've never seen this error before, but a couple of things caught
> my attention that I would look at closer to chase the root cause of the
> issue.
>
> "org.springframework.context.annotation.AnnotationConfigApplicationContext:"
> and "21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
> Application run failed" seem to indicate that you're using Spring Boot
> (that I know almost nothing about so take the following with a pinch of
> salt :))
>
> Spring Boot manages the classpath by itself and together with another
> interesting option in your
> spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how
> much this exception:
>
> > org.apache.spark.scheduler.ExternalClusterManager:
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
> subtype
>
> could be due to casting compatible types from two different classloaders?
>
> Just a thought but wanted to share as I think it's worth investigating.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books <https://books.japila.pl/>
> Follow me on https://twitter.com/jaceklaskowski
>
> <https://twitter.com/jaceklaskowski>
>
>
> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou 
> wrote:
>
>> Hello,
>>
>> I have been facing the current issue for some time now and I was
>> wondering if someone might have some inside on how I can resolve the
>> following.
>>
>> The code (java 11) is working correctly on my local machine but whenever
>> I try to launch the following on K8 I am getting the following error.
>>
>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>> initializing SparkContext.
>>
>> java.util.ServiceConfigurationError:
>> org.apache.spark.scheduler.ExternalClusterManager:
>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>> subtype
>>
>>
>>
>> I have a spark that will monitor some directories and handle the data
>> accordingly.
>>
>> That part is working correctly on K8 and the SparkContext has no issue
>> being initialized there.
>>
>>
>> This is the spark-submit for that
>>
>>
>> spark-submit \
>> --master=k8s://https://url:port \
>> --deploy-mode cluster \
>> --name a-name\
>> --conf spark.driver.userClassPathFirst=true  \
>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path \
>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>> --conf spark.kubernetes.namespace=spark \
>> --conf spark.kubernet

Spark Stream on Kubernetes Cannot Set up JavaSparkContext

2021-08-31 Thread Stelios Philippou
Hello,

I have been facing the current issue for some time now and I was wondering
if someone might have some inside on how I can resolve the following.

The code (java 11) is working correctly on my local machine but whenever I
try to launch the following on K8 I am getting the following error.

21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error initializing
SparkContext.

java.util.ServiceConfigurationError:
org.apache.spark.scheduler.ExternalClusterManager:
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
subtype



I have a spark that will monitor some directories and handle the data
accordingly.

That part is working correctly on K8 and the SparkContext has no issue
being initialized there.


This is the spark-submit for that


spark-submit \
--master=k8s://https://url:port \
--deploy-mode cluster \
--name a-name\
--conf spark.driver.userClassPathFirst=true  \
--conf spark.kubernetes.file.upload.path=hdfs://upload-path \
--files "application-dev.properties,keystore.jks,truststore.jks"  \
--conf spark.kubernetes.container.image=url/spark:spark-submit \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.namespace=spark \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.dynamicAllocation.enabled=false \
--driver-memory 525m --executor-memory 525m \
--num-executors 1 --executor-cores 1 \
target/SparkStream.jar continuous-merge


My issue comes when I try to launch the service in order to listen to kafka
events and store them in HDFS.


spark-submit \
--master=k8s://https://url:port \
--deploy-mode cluster \
--name consume-data \
--conf spark.driver.userClassPathFirst=true  \
--conf spark.kubernetes.file.upload.path=hdfs://upload-path\
--files "application-dev.properties,keystore.jks,truststore.jks"  \
--jars 
"spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
\
--conf spark.kubernetes.container.image=url/spark:spark-submit \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
--conf spark.kubernetes.namespace=spark \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.dynamicAllocation.enabled=false \
--driver-memory 1g --executor-memory 1g \
--num-executors 1 --executor-cores 1 \
target/SparkStream.jar consume


It could be that I am launching the application wrongly or perhaps that my
K8 is not configured correctly ?



I have stripped down my code and left it barebone and will end up with the
following issue :


21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error initializing
SparkContext.

java.util.ServiceConfigurationError:
org.apache.spark.scheduler.ExternalClusterManager:
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
subtype

at java.base/java.util.ServiceLoader.fail(Unknown Source)

at
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown
Source)

at
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown
Source)

at java.base/java.util.ServiceLoader$2.hasNext(Unknown Source)

at java.base/java.util.ServiceLoader$3.hasNext(Unknown Source)

at
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:


21/08/31 07:28:42 WARN
org.springframework.context.annotation.AnnotationConfigApplicationContext:
Exception encountered during context initialization - cancelling refresh
attempt: org.springframework.beans.factory.UnsatisfiedDependencyException:
Error creating bean with name 'mainApplication': Unsatisfied dependency
expressed through field 'streamAllKafkaData'; nested exception is
org.springframework.beans.factory.UnsatisfiedDependencyException: Error
creating bean with name 'streamAllKafkaData': Unsatisfied dependency
expressed through field 'javaSparkContext'; nested exception is
org.springframework.beans.factory.BeanCreationException: Error creating
bean with name 'javaSparkContext' defined in class path resource
[com/configuration/SparkConfiguration.class]: Bean instantiation via
factory method failed; nested exception is
org.springframework.beans.BeanInstantiationException: Failed to instantiate
[org.apache.spark.api.java.JavaSparkContext]: Factory method
'javaSparkContext' threw exception; nested exception is
java.util.ServiceConfigurationError:
org.apache.spark.scheduler.ExternalClusterManager:
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
subtype

21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
Application run failed

org.springframework.beans.factory.UnsatisfiedDependencyException: Error
creating bean with name 'mainApplication': Unsatisfied dependency expressed
through field 'streamAllKafkaData'; nested exception is
org.springframework.beans.factory.UnsatisfiedDependencyException: Error
creating bean with name