Re: How to generate unique incrementing identifier in a structured streaming dataframe

2021-07-15 Thread Mich Talebzadeh
Yes that is true. UUID only introduces uniqueness to the record. Some NoSql
databases requires a primary key where UUID can be used.


import java.util.UUID

scala> var pk = UUID.randomUUID

pk: java.util.UUID = 0d91e11a-f5f6-4b4b-a120-8c46a31dad0bscala>

pk = UUID.randomUUID
pk: java.util.UUID = 137ab1ef-625a-4277-9d94-9f4a11d793fc


So they are totally random.


Now Kafka producer requires a key, value pair, We generate UUID key as the
unique identifier of Kafka record


 uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType())
 result = df.withColumn("uuid",uuidUdf()) \


So back to your question. What is the use case for identity() in your SSS
application? If you want a true value as close as accurate (even MSSQL can
have gaps in the identity column because of crash etc), you need to store
the last value in a persistent storage like Hive table etc and start from


val start = spark.sql("SELECT MAX(id) FROM
test.randomData").collect.apply(0).getInt(0) + 1


HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 15 Jul 2021 at 20:40, Felix Kizhakkel Jose <
felixkizhakkelj...@gmail.com> wrote:

> Thank you so much for the insights.
> @Mich Talebzadeh  Really appreciate your
> detailed examples.
> @Jungtaek Lim I see your point. I am thinking of having a mapping table
> with UUID to incremental ID and leverage range pruning etc on a large
> dataset.
> @sebastian I have to check how to do something like snowflake id. Do you
> have any examples or directions?
>
> Let me ask you another way, how are you handling the non incrementing
> UUIDs? Because Parquet - range stats has min and max, but if your id is a
> UUID, this doesn't help to decide whether the value that you search is
> present in the files until you scan the entire file, because min-max on
> uuid doesn't work greatly.
>
> Please share your experiences or ideas on how you handled this situation.
>
> Regards,
> Felix K Jose
>
> On Tue, Jul 13, 2021 at 7:59 PM Jungtaek Lim 
> wrote:
>
>> Theoretically, the composed value of batchId +
>> monotonically_increasing_id() would achieve the goal. The major downside is
>> that you'll need to deal with "deduplication" of output based on batchID
>> as monotonically_increasing_id() is indeterministic. You need to ensure
>> there's NO overlap on output against multiple reattempts for the same batch
>> ID.
>>
>> Btw, even just assume you dealt with auto increasing ID on write, how do
>> you read files and apply range pruning by auto increasing ID? Is the
>> approach scalable and efficient? You probably couldn't avoid reading
>> unnecessary files unless you build an explicit metadata regarding files
>> like the map file name to the range of ID and also craft a custom reader to
>> leverage the information.
>>
>>
>> On Wed, Jul 14, 2021 at 6:00 AM Sebastian Piu 
>> wrote:
>>
>>> If you want them to survive across jobs you can use snowflake IDs or
>>> similar ideas depending on your use case
>>>
>>> On Tue, 13 Jul 2021, 9:33 pm Mich Talebzadeh, 
>>> wrote:
>>>
 Meaning as a monolithically incrementing ID as in Oracle sequence for
 each record read from Kafka. adding that to your dataframe?

 If you do Structured Structured Streaming in microbatch mode, you will
 get what is known as BatchId

result = streamingDataFrame.select( \
  col("parsed_value.rowkey").alias("rowkey") \
, col("parsed_value.ticker").alias("ticker") \
, col("parsed_value.timeissued").alias("timeissued")
 \
, col("parsed_value.price").alias("price")). \
  writeStream. \
  outputMode('append'). \
  option("truncate", "false"). \
  *foreachBatch(sendToSink). \*
  trigger(processingTime='30 seconds'). \
  option('checkpointLocation', checkpoint_path). \
  queryName(config['MDVariables']['topic']). \

 That function sendToSink will introduce two variables df and batchId

 def *sendToSink(df, batchId):*
 if(len(df.take(1))) > 0:
 print(f"""md batchId is {batchId}""")
 df.show(100,False)
 df. persist()
 # write to BigQuery batch table
 s.writeTableToBQ(df, "append",
 config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
 df.unpersist()
 print(f"""wrote to DB""")
 else:
 print("DataFrame md is empty")

 That value 

Re: How to generate unique incrementing identifier in a structured streaming dataframe

2021-07-15 Thread Felix Kizhakkel Jose
Thank you so much for the insights.
@Mich Talebzadeh  Really appreciate your
detailed examples.
@Jungtaek Lim I see your point. I am thinking of having a mapping table
with UUID to incremental ID and leverage range pruning etc on a large
dataset.
@sebastian I have to check how to do something like snowflake id. Do you
have any examples or directions?

Let me ask you another way, how are you handling the non incrementing
UUIDs? Because Parquet - range stats has min and max, but if your id is a
UUID, this doesn't help to decide whether the value that you search is
present in the files until you scan the entire file, because min-max on
uuid doesn't work greatly.

Please share your experiences or ideas on how you handled this situation.

Regards,
Felix K Jose

On Tue, Jul 13, 2021 at 7:59 PM Jungtaek Lim 
wrote:

> Theoretically, the composed value of batchId +
> monotonically_increasing_id() would achieve the goal. The major downside is
> that you'll need to deal with "deduplication" of output based on batchID
> as monotonically_increasing_id() is indeterministic. You need to ensure
> there's NO overlap on output against multiple reattempts for the same batch
> ID.
>
> Btw, even just assume you dealt with auto increasing ID on write, how do
> you read files and apply range pruning by auto increasing ID? Is the
> approach scalable and efficient? You probably couldn't avoid reading
> unnecessary files unless you build an explicit metadata regarding files
> like the map file name to the range of ID and also craft a custom reader to
> leverage the information.
>
>
> On Wed, Jul 14, 2021 at 6:00 AM Sebastian Piu 
> wrote:
>
>> If you want them to survive across jobs you can use snowflake IDs or
>> similar ideas depending on your use case
>>
>> On Tue, 13 Jul 2021, 9:33 pm Mich Talebzadeh, 
>> wrote:
>>
>>> Meaning as a monolithically incrementing ID as in Oracle sequence for
>>> each record read from Kafka. adding that to your dataframe?
>>>
>>> If you do Structured Structured Streaming in microbatch mode, you will
>>> get what is known as BatchId
>>>
>>>result = streamingDataFrame.select( \
>>>  col("parsed_value.rowkey").alias("rowkey") \
>>>, col("parsed_value.ticker").alias("ticker") \
>>>, col("parsed_value.timeissued").alias("timeissued") \
>>>, col("parsed_value.price").alias("price")). \
>>>  writeStream. \
>>>  outputMode('append'). \
>>>  option("truncate", "false"). \
>>>  *foreachBatch(sendToSink). \*
>>>  trigger(processingTime='30 seconds'). \
>>>  option('checkpointLocation', checkpoint_path). \
>>>  queryName(config['MDVariables']['topic']). \
>>>
>>> That function sendToSink will introduce two variables df and batchId
>>>
>>> def *sendToSink(df, batchId):*
>>> if(len(df.take(1))) > 0:
>>> print(f"""md batchId is {batchId}""")
>>> df.show(100,False)
>>> df. persist()
>>> # write to BigQuery batch table
>>> s.writeTableToBQ(df, "append",
>>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>>> df.unpersist()
>>> print(f"""wrote to DB""")
>>> else:
>>> print("DataFrame md is empty")
>>>
>>> That value batchId can be used for each Batch.
>>>
>>>
>>> Otherwise you can do this
>>>
>>>
>>> startval = 1
>>> df = df.withColumn('id', monotonicallyIncreasingId + startval)
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 13 Jul 2021 at 19:53, Felix Kizhakkel Jose <
>>> felixkizhakkelj...@gmail.com> wrote:
>>>
 Hello,

 I am using Spark Structured Streaming to sink data from Kafka to AWS
 S3. I am wondering if its possible for me to introduce a uniquely
 incrementing identifier for each record as we do in RDBMS (incrementing
 long id)?
 This would greatly benefit to range prune while reading based on this
 ID.

 Any thoughts? I have looked at monotonically_incrementing_id but seems
 like its not deterministic and it wont ensure new records gets next id from
 the latest id what  is already present in the storage (S3)

 Regards,
 Felix K Jose

>>>


Blog post introducing (Apache) DataFu-Spark

2021-07-15 Thread Eyal Allweil
Hi all,

Apache DataFu  is an Apache project of general
purpose Hadoop utils, and *datafu-spark* is a new module in this project
with general utilities and UDFs that can be useful to Spark developers.

This is a blog post I wrote introducing some of the APIs in DataFu-Spark:

https://medium.com/paypal-tech/introducing-datafu-spark-ba67faf1933a

Cheers,
Eyal


Re: Unable to write data into hive table using Spark via Hive JDBC driver Caused by: org.apache.hive.service.cli.HiveSQLException: Error while compiling statement: FAILED

2021-07-15 Thread Mich Talebzadeh
Have you created that table in Hive or are you trying to create it from
Spark itself.

You Hive is local. In this case you don't need a JDBC connection. Have you
tried:

df2.write.mode("overwrite").saveAsTable(mydb.mytable)

HTH




   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 15 Jul 2021 at 12:51, Badrinath Patchikolla <
pbadrinath1...@gmail.com> wrote:

> Hi,
>
> Trying to write data in spark to the hive as JDBC mode below  is the
> sample code:
>
> spark standalone 2.4.7 version
>
> 21/07/15 08:04:07 WARN util.NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
> Spark context Web UI available at http://localhost:4040
> Spark context available as 'sc' (master = spark://localhost:7077, app id =
> app-20210715080414-0817).
> Spark session available as 'spark'.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 2.4.7
>   /_/
>
> Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_292)
> Type in expressions to have them evaluated.
> Type :help for more information.
>
> scala> :paste
> // Entering paste mode (ctrl-D to finish)
>
> val df = Seq(
> ("John", "Smith", "London"),
> ("David", "Jones", "India"),
> ("Michael", "Johnson", "Indonesia"),
> ("Chris", "Lee", "Brazil"),
> ("Mike", "Brown", "Russia")
>   ).toDF("first_name", "last_name", "country")
>
>
>  df.write
>   .format("jdbc")
>   .option("url",
> "jdbc:hive2://localhost:1/foundation;AuthMech=2;UseNativeQuery=0")
>   .option("dbtable", "test.test")
>   .option("user", "admin")
>   .option("password", "admin")
>   .option("driver", "com.cloudera.hive.jdbc41.HS2Driver")
>   .mode("overwrite")
>   .save
>
>
> // Exiting paste mode, now interpreting.
>
> java.sql.SQLException: [Cloudera][HiveJDBCDriver](500051) ERROR processing
> query/statement. Error Code: 4, SQL state:
> TStatus(statusCode:ERROR_STATUS,
> infoMessages:[*org.apache.hive.service.cli.HiveSQLException:Error while
> compiling statement: FAILED: ParseException line 1:39 cannot recognize
> input near '"first_name"' 'TEXT' ',' in column name or primary key or
> foreign key:28:27,
> org.apache.hive.service.cli.operation.Operation:toSQLException:Operation.java:329,
> org.apache.hive.service.cli.operation.SQLOperation:prepare:SQLOperation.java:207,
> org.apache.hive.service.cli.operation.SQLOperation:runInternal:SQLOperation.java:290,
> org.apache.hive.service.cli.operation.Operation:run:Operation.java:260,
> org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementInternal:HiveSessionImpl.java:504,
> org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementAsync:HiveSessionImpl.java:490,
> sun.reflect.GeneratedMethodAccessor13:invoke::-1,
> sun.reflect.DelegatingMethodAccessorImpl:invoke:DelegatingMethodAccessorImpl.java:43,
> java.lang.reflect.Method:invoke:Method.java:498,
> org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:78,
> org.apache.hive.service.cli.session.HiveSessionProxy:access$000:HiveSessionProxy.java:36,
> org.apache.hive.service.cli.session.HiveSessionProxy$1:run:HiveSessionProxy.java:63,
> java.security.AccessController:doPrivileged:AccessController.java:-2,
> javax.security.auth.Subject:doAs:Subject.java:422,
> org.apache.hadoop.security.UserGroupInformation:doAs:UserGroupInformation.java:1875,
> org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:59,
> com.sun.proxy.$Proxy35:executeStatementAsync::-1,
> org.apache.hive.service.cli.CLIService:executeStatementAsync:CLIService.java:295,
> org.apache.hive.service.cli.thrift.ThriftCLIService:ExecuteStatement:ThriftCLIService.java:507,
> org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1437,
> org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1422,
> org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39,
> org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39,
> org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddressProcessor.java:56,
> org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:286,
> java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1149,
> java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:624,
> 

Unable to write data into hive table using Spark via Hive JDBC driver Caused by: org.apache.hive.service.cli.HiveSQLException: Error while compiling statement: FAILED

2021-07-15 Thread Badrinath Patchikolla
Hi,

Trying to write data in spark to the hive as JDBC mode below  is the sample
code:

spark standalone 2.4.7 version

21/07/15 08:04:07 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = spark://localhost:7077, app id =
app-20210715080414-0817).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.7
  /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_292)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

val df = Seq(
("John", "Smith", "London"),
("David", "Jones", "India"),
("Michael", "Johnson", "Indonesia"),
("Chris", "Lee", "Brazil"),
("Mike", "Brown", "Russia")
  ).toDF("first_name", "last_name", "country")


 df.write
  .format("jdbc")
  .option("url",
"jdbc:hive2://localhost:1/foundation;AuthMech=2;UseNativeQuery=0")
  .option("dbtable", "test.test")
  .option("user", "admin")
  .option("password", "admin")
  .option("driver", "com.cloudera.hive.jdbc41.HS2Driver")
  .mode("overwrite")
  .save


// Exiting paste mode, now interpreting.

java.sql.SQLException: [Cloudera][HiveJDBCDriver](500051) ERROR processing
query/statement. Error Code: 4, SQL state:
TStatus(statusCode:ERROR_STATUS,
infoMessages:[*org.apache.hive.service.cli.HiveSQLException:Error while
compiling statement: FAILED: ParseException line 1:39 cannot recognize
input near '"first_name"' 'TEXT' ',' in column name or primary key or
foreign key:28:27,
org.apache.hive.service.cli.operation.Operation:toSQLException:Operation.java:329,
org.apache.hive.service.cli.operation.SQLOperation:prepare:SQLOperation.java:207,
org.apache.hive.service.cli.operation.SQLOperation:runInternal:SQLOperation.java:290,
org.apache.hive.service.cli.operation.Operation:run:Operation.java:260,
org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementInternal:HiveSessionImpl.java:504,
org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementAsync:HiveSessionImpl.java:490,
sun.reflect.GeneratedMethodAccessor13:invoke::-1,
sun.reflect.DelegatingMethodAccessorImpl:invoke:DelegatingMethodAccessorImpl.java:43,
java.lang.reflect.Method:invoke:Method.java:498,
org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:78,
org.apache.hive.service.cli.session.HiveSessionProxy:access$000:HiveSessionProxy.java:36,
org.apache.hive.service.cli.session.HiveSessionProxy$1:run:HiveSessionProxy.java:63,
java.security.AccessController:doPrivileged:AccessController.java:-2,
javax.security.auth.Subject:doAs:Subject.java:422,
org.apache.hadoop.security.UserGroupInformation:doAs:UserGroupInformation.java:1875,
org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:59,
com.sun.proxy.$Proxy35:executeStatementAsync::-1,
org.apache.hive.service.cli.CLIService:executeStatementAsync:CLIService.java:295,
org.apache.hive.service.cli.thrift.ThriftCLIService:ExecuteStatement:ThriftCLIService.java:507,
org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1437,
org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1422,
org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39,
org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39,
org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddressProcessor.java:56,
org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:286,
java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1149,
java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:624,
java.lang.Thread:run:Thread.java:748,
*org.apache.hadoop.hive.ql.parse.ParseException:line 1:39 cannot recognize
input near '"first_name"' 'TEXT' ',' in column name or primary key or
foreign key:33:6,
org.apache.hadoop.hive.ql.parse.ParseDriver:parse:ParseDriver.java:221,
org.apache.hadoop.hive.ql.parse.ParseUtils:parse:ParseUtils.java:75,
org.apache.hadoop.hive.ql.parse.ParseUtils:parse:ParseUtils.java:68,
org.apache.hadoop.hive.ql.Driver:compile:Driver.java:564,
org.apache.hadoop.hive.ql.Driver:compileInternal:Driver.java:1425,
org.apache.hadoop.hive.ql.Driver:compileAndRespond:Driver.java:1398,
org.apache.hive.service.cli.operation.SQLOperation:prepare:SQLOperation.java:205],
sqlState:42000, errorCode:4, errorMessage:Error while compiling
statement: FAILED: ParseException line 1:39 cannot recognize input near
'"first_name"' 'TEXT' ',' in column name or primary key or foreign key),
Query: 

[Spark SQL] : at org.apache.spark.sql.execution.datasources.orc.OrcColumnVector.getDecimal(OrcColumnVector.java:158)

2021-07-15 Thread Ragini Manjaiah


Hi Team

I am trying to read from the HDFS path which is a partition on a sales date
and then selecting only one particular column spark job fails which is of
type decimal(32,20) (nullable = true). when I exclude this column and
select others it works fine.

hitting with at
org.apache.spark.sql.execution.datasources.orc.OrcColumnVector.getDecimal(OrcColumnVector.java:158)

alternativeApproach I took: I took a sample of data into another HDFS path
and queried it is working fine

what could be scenarios we can hit the above issue


Re: compile spark 3.1.1 error

2021-07-15 Thread jiahong li
currently, no solutions find!

Dereck Li
Apache Spark Contributor
Continuing Learner
@Hangzhou,China


jason_xu  于2021年5月11日周二 上午8:01写道:

> Hi Jiahong, I got the same failure on building spark 3.1.1 with hadoop
> 2.8.5.
> Any chance you find a solution?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>