Re: In windows 10, accessing Hive from PySpark with PyCharm throws error

2020-12-03 Thread Artemis User
You don't have to include all your config and log messages.  The error 
message would suffice.  The java.lang.UnsatisfiedLinkError exception 
indicates that the JVM can't find some OS-specific libraries (or 
commonly referred as native libraries).  On Windows, they would be some 
dll files.  Look into your Hadoop installation and you will find the 
$HADOOPHOME/lib/native directory.  All the OS-specific library files are 
there (on Windows, this lib path may be different).  So add this path to 
your PATH environmental variable in your command shell before running 
spark-submit again.


-- ND

On 12/3/20 6:28 PM, Mich Talebzadeh wrote:

This is becoming serious pain.

using powershell I am using spark-submit as follows:

PS C:\Users\admin> spark-submit.cmd 
C:\Users\admin\PycharmProjects\pythonProject\main.py


WARNING: An illegal reflective access operation has occurred

WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform 
(file:/D:/temp/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor 
java.nio.DirectByteBuffer(long,int)


WARNING: Please consider reporting this to the maintainers of 
org.apache.spark.unsafe.Platform


WARNING: Use --illegal-access=warn to enable warnings of further 
illegal reflective access operations


WARNING: All illegal access operations will be denied in a future release

Using Spark's default log4j profile: 
org/apache/spark/log4j-defaults.properties


20/12/03 23:13:59 INFO SparkContext: Running Spark version 3.0.1

20/12/03 23:13:59 INFO ResourceUtils: 
==


20/12/03 23:13:59 INFO ResourceUtils: Resources for spark.driver:


20/12/03 23:13:59 INFO ResourceUtils: 
==


20/12/03 23:13:59 INFO SparkContext: Submitted application: App1

20/12/03 23:13:59 INFO SecurityManager: Changing view acls to: admin

20/12/03 23:13:59 INFO SecurityManager: Changing modify acls to: admin

20/12/03 23:13:59 INFO SecurityManager: Changing view acls groups to:

20/12/03 23:13:59 INFO SecurityManager: Changing modify acls groups to:

20/12/03 23:13:59 INFO SecurityManager: SecurityManager: 
authentication disabled; ui acls disabled; users with view 
permissions: Set(admin); groups with view permissions: Set(); users  
with modify permissions: Set(admin); groups with modify permissions: Set()


20/12/03 23:14:00 INFO Utils: Successfully started service 
'sparkDriver' on port 62327.


20/12/03 23:14:00 INFO SparkEnv: Registering MapOutputTracker

20/12/03 23:14:00 INFO SparkEnv: Registering BlockManagerMaster

20/12/03 23:14:01 INFO BlockManagerMasterEndpoint: Using 
org.apache.spark.storage.DefaultTopologyMapper for getting topology 
information


20/12/03 23:14:01 INFO BlockManagerMasterEndpoint: 
BlockManagerMasterEndpoint up


20/12/03 23:14:01 INFO SparkEnv: Registering BlockManagerMasterHeartbeat

20/12/03 23:14:01 INFO DiskBlockManager: Created local directory at 
C:\Users\admin\AppData\Local\Temp\blockmgr-30e2019a-af60-44da-86e7-8a162d1e29da


20/12/03 23:14:01 INFO MemoryStore: MemoryStore started with capacity 
434.4 MiB


20/12/03 23:14:01 INFO SparkEnv: Registering OutputCommitCoordinator

20/12/03 23:14:01 INFO Utils: Successfully started service 'SparkUI' 
on port 4040.


20/12/03 23:14:01 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started 
at http://w7:4040 


20/12/03 23:14:01 INFO Executor: Starting executor ID driver on host w7

20/12/03 23:14:01 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 62373.


20/12/03 23:14:01 INFO NettyBlockTransferService: Server created on 
w7:62373


20/12/03 23:14:01 INFO BlockManager: Using 
org.apache.spark.storage.RandomBlockReplicationPolicy for block 
replication policy


20/12/03 23:14:01 INFO BlockManagerMaster: Registering BlockManager 
BlockManagerId(driver, w7, 62373, None)


20/12/03 23:14:01 INFO BlockManagerMasterEndpoint: Registering block 
manager w7:62373 with 434.4 MiB RAM, BlockManagerId(driver, w7, 62373, 
None)


20/12/03 23:14:01 INFO BlockManagerMaster: Registered BlockManager 
BlockManagerId(driver, w7, 62373, None)


20/12/03 23:14:01 INFO BlockManager: Initialized BlockManager: 
BlockManagerId(driver, w7, 62373, None)


D:\temp\spark\python\lib\pyspark.zip\pyspark\context.py:225: 
DeprecationWarning: Support for Python 2 and Python 3 prior to version 
3.6 is deprecated as of Spark 3.0. See also the plan for dropping 
Python 2 support at 
https://spark.apache.org/news/plan-for-dropping-python-2-support.html 
.


DeprecationWarning)

*20/12/03 23:14:02 INFO SharedState: loading hive config file: 
file:/D:/temp/spark/conf/hive-site.xml*


*20/12/03 23:14:02 INFO SharedState: spark.sql.warehouse.dir is not 
set, but hive.metastore.warehouse.dir is set. Setting 
spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir 

subscribe

2020-12-03 Thread 邮件帮助中心



Re: In windows 10, accessing Hive from PySpark with PyCharm throws error

2020-12-03 Thread Mich Talebzadeh
This is becoming serious pain.

using powershell I am using spark-submit as follows:

PS C:\Users\admin> spark-submit.cmd
C:\Users\admin\PycharmProjects\pythonProject\main.py

WARNING: An illegal reflective access operation has occurred

WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform
(file:/D:/temp/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor
java.nio.DirectByteBuffer(long,int)

WARNING: Please consider reporting this to the maintainers of
org.apache.spark.unsafe.Platform

WARNING: Use --illegal-access=warn to enable warnings of further illegal
reflective access operations

WARNING: All illegal access operations will be denied in a future release

Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties

20/12/03 23:13:59 INFO SparkContext: Running Spark version 3.0.1

20/12/03 23:13:59 INFO ResourceUtils:
==

20/12/03 23:13:59 INFO ResourceUtils: Resources for spark.driver:


20/12/03 23:13:59 INFO ResourceUtils:
==

20/12/03 23:13:59 INFO SparkContext: Submitted application: App1

20/12/03 23:13:59 INFO SecurityManager: Changing view acls to: admin

20/12/03 23:13:59 INFO SecurityManager: Changing modify acls to: admin

20/12/03 23:13:59 INFO SecurityManager: Changing view acls groups to:

20/12/03 23:13:59 INFO SecurityManager: Changing modify acls groups to:

20/12/03 23:13:59 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users  with view permissions: Set(admin);
groups with view permissions: Set(); users  with modify permissions:
Set(admin); groups with modify permissions: Set()

20/12/03 23:14:00 INFO Utils: Successfully started service 'sparkDriver' on
port 62327.

20/12/03 23:14:00 INFO SparkEnv: Registering MapOutputTracker

20/12/03 23:14:00 INFO SparkEnv: Registering BlockManagerMaster

20/12/03 23:14:01 INFO BlockManagerMasterEndpoint: Using
org.apache.spark.storage.DefaultTopologyMapper for getting topology
information

20/12/03 23:14:01 INFO BlockManagerMasterEndpoint:
BlockManagerMasterEndpoint up

20/12/03 23:14:01 INFO SparkEnv: Registering BlockManagerMasterHeartbeat

20/12/03 23:14:01 INFO DiskBlockManager: Created local directory at
C:\Users\admin\AppData\Local\Temp\blockmgr-30e2019a-af60-44da-86e7-8a162d1e29da

20/12/03 23:14:01 INFO MemoryStore: MemoryStore started with capacity 434.4
MiB

20/12/03 23:14:01 INFO SparkEnv: Registering OutputCommitCoordinator

20/12/03 23:14:01 INFO Utils: Successfully started service 'SparkUI' on
port 4040.

20/12/03 23:14:01 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at
http://w7:4040

20/12/03 23:14:01 INFO Executor: Starting executor ID driver on host w7

20/12/03 23:14:01 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 62373.

20/12/03 23:14:01 INFO NettyBlockTransferService: Server created on w7:62373

20/12/03 23:14:01 INFO BlockManager: Using
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication
policy

20/12/03 23:14:01 INFO BlockManagerMaster: Registering BlockManager
BlockManagerId(driver, w7, 62373, None)

20/12/03 23:14:01 INFO BlockManagerMasterEndpoint: Registering block
manager w7:62373 with 434.4 MiB RAM, BlockManagerId(driver, w7, 62373, None)

20/12/03 23:14:01 INFO BlockManagerMaster: Registered BlockManager
BlockManagerId(driver, w7, 62373, None)

20/12/03 23:14:01 INFO BlockManager: Initialized BlockManager:
BlockManagerId(driver, w7, 62373, None)

D:\temp\spark\python\lib\pyspark.zip\pyspark\context.py:225:
DeprecationWarning: Support for Python 2 and Python 3 prior to version 3.6
is deprecated as of Spark 3.0. See also the plan for dropping Python 2
support at
https://spark.apache.org/news/plan-for-dropping-python-2-support.html.

  DeprecationWarning)

*20/12/03 23:14:02 INFO SharedState: loading hive config file:
file:/D:/temp/spark/conf/hive-site.xml*

*20/12/03 23:14:02 INFO SharedState: spark.sql.warehouse.dir is not set,
but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to
the value of hive.metastore.warehouse.dir
('C:\Users\admin\PycharmProjects\pythonProject\spark-warehouse').*

*20/12/03 23:14:02 INFO SharedState: Warehouse path is
'C:\Users\admin\PycharmProjects\pythonProject\spark-warehouse'.*

*20/12/03 23:14:04 INFO HiveConf: Found configuration file
file:/D:/temp/spark/conf/hive-site.xml*

*20/12/03 23:14:04 INFO HiveUtils: Initializing HiveMetastoreConnection
version 2.3.7 using Spark classes.*

*Traceback (most recent call last):*

*  File "C:/Users/admin/PycharmProjects/pythonProject/main.py", line 79, in
*

*spark.sql("CREATE DATABASE IF NOT EXISTS test")*

*  File "D:\temp\spark\python\lib\pyspark.zip\pyspark\sql\session.py", line
649, in sql*

*  File
"D:\temp\spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line
1305, in __call__*

*  File 

Re: Kafka structured straming - how to read headers

2020-12-03 Thread German Schiavon
Hello,

see if this works, from the documentation:


// Subscribe to 1 topic, with headersval df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)",
"headers")
  .as[(String, String, Map)]


On Thu, 3 Dec 2020 at 18:22,  wrote:

> Hi folks,
>   I am trying to read the message headers from a Kafka structured stream
> which should be stored in a column named ``headers``.
> I try something like this:
>
> val stream = sparkSession.readStream.format("kafka")..load()
>
> stream.map(row => {
>
>  ...
>
>  val headers = row.getAs[HeaderT]("headers")
>
> 
>
> })
>
>
> My question is: what would be *HeaderT*?
>
> Thanks in advance
>
>  Eugen
>


Kafka structured straming - how to read headers

2020-12-03 Thread eugen . wintersberger
Hi folks,
  I am trying to read the message headers from a Kafka structured
stream which should be stored in a column named ``headers``. 
I try something like this:

val stream = sparkSession.readStream.format("kafka")..load()
stream.map(row => { 
 ...
 val headers = row.getAs[HeaderT]("headers")

})

My question is: what would be HeaderT?

Thanks in advance

 Eugen


Re: [EXTERNAL] - Re: Spark ML / ALS question

2020-12-03 Thread Steve Pruitt
Thanks, I confused myself.  I was looking at 
org.apache.spark.ml.recommendation.ALS Javadoc.  Not sure why it shows up.   I 
didn't notice the Developer API tag, so "fit" it is!

-S

From: Sean Owen 
Sent: Wednesday, December 2, 2020 3:51 PM
To: Steve Pruitt 
Cc: user@spark.apache.org 
Subject: [EXTERNAL] - Re: Spark ML / ALS question

There is only a fit() method in 
spark.ml's
 ALS
http://spark.apache.org/docs/latest/api/scala/org/apache/spark/ml/recommendation/ALS.html

The older spark.mllib interface has a train() method. You'd generally use the 
spark.ml
 version.

On Wed, Dec 2, 2020 at 2:13 PM Steve Pruitt  
wrote:
I am having a little difficulty finding information on the ALS train(…) method 
in 
spark.ml.
  Its unclear when to use it.  In the java doc, the parameters are undocumented.

What is difference between train(..) and fit(..).  When would do you use one or 
the other?


-S



Re: Structured Streaming Checkpoint Error

2020-12-03 Thread German Schiavon
Thanks Jungtaek!

It makes sense, we are currently changing to an HDFS-Compatible FS, I was
wondering how this change would impact the checkpoint, but after what you
said it is more clear now.



On Thu, 3 Dec 2020 at 00:23, Jungtaek Lim 
wrote:

> In theory it would work, but works very inefficiently on checkpointing. If
> I understand correctly, it will write the content to the temp file on s3,
> and rename the file which actually gets the temp file from s3 and write the
> content of temp file to the final path on s3. Compared to checkpoint with
> HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom
> implementation of checkpoint manager on S3.
>
> Also atomic rename is still not working for S3, as well as S3 doesn't
> support write with overwrite=false. That said, there's no barrier if
> concurrent streaming queries access to the same checkpoint and mess up.
> With checkpoint in HDFS, the rename is atomic and only one succeeds even in
> parallel and the other query lost writing to the checkpoint file simply
> fails. That's a caveat you may want to keep in mind.
>
> On Wed, Dec 2, 2020 at 11:35 PM German Schiavon 
> wrote:
>
>> Hello!
>>
>> @Gabor Somogyi   I wonder that now that s3 is 
>> *strongly
>> consistent* , would work fine.
>>
>>
>> Regards!
>>
>> https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/
>>
>> On Thu, 17 Sep 2020 at 11:55, German Schiavon 
>> wrote:
>>
>>> Hi Gabor,
>>>
>>> Makes sense, thanks a lot!
>>>
>>> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi 
>>> wrote:
>>>
 Hi,

 Structured Streaming is simply not working when checkpoint location is
 on S3 due to it's read-after-write consistency.
 Please choose an HDFS compliant filesystem and it will work like a
 charm.

 BR,
 G


 On Wed, Sep 16, 2020 at 4:12 PM German Schiavon <
 gschiavonsp...@gmail.com> wrote:

> Hi!
>
> I have an Structured Streaming Application that reads from kafka,
> performs some aggregations and writes in S3 in parquet format.
>
> Everything seems to work great except that from time to time I get a
> checkpoint error, at the beginning I thought it was a random error but it
> happened more than 3 times already in a few days
>
> Caused by: java.io.FileNotFoundException: No such file or directory:
> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>
>
> Does this happen to anyone else?
>
> Thanks in advance.
>
> *This is the full error :*
>
> ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
> 14edaddf-25bb-4259-b7a2-6107907f962f, runId =
> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error
>
> java.io.FileNotFoundException: No such file or directory:
> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)
>
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)
>
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)
>
> at
> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)
>
> at
> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
>
> at
> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
>
> at
> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
>
> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)
>
> at
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)
>
> at
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
>
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)
>
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)
>
> at
> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
> at scala.Option.getOrElse(Option.scala:189)
>