Unsubscribe

2023-07-17 Thread Bode, Meikel
Unsubscribe


Unsubscribe

2023-07-16 Thread Bode, Meikel
Unsubscribe


RE: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? your target release day for Spark3.3?

2022-01-21 Thread Bode, Meikel, NM-X-DS
Hello Juan Liu,

The release process is well documented (see last step on announcement):
https://spark.apache.org/release-process.html

To (un)subcribe to the mailing lists see:
https://spark.apache.org/community.html

Best,
Meikel

Meikel Bode, MSc
Senior Manager | Head of SAP Data Platforms & Analytics
-
Postal address:
Arvato Systems GmbH
Reinhard-Mohn-Straße 200
3 Gütersloh
Germany

Visitor address:
Arvato Systems GmbH
Fuggerstraße 11
33689 Bielefeld
Germany

Phone: +49(5241)80-89734
Mobile: +49(151)14774185
E-Mail: meikel.b...@bertelsmann.de<mailto:meikel.b...@bertelsmann.de>
arvato-systems.de<https://www.arvato-systems.de/>



From: Juan Liu 
Sent: Donnerstag, 20. Januar 2022 09:44
To: Bode, Meikel, NM-X-DS 
Cc: sro...@gmail.com; Theodore J Griesenbrock ; 
user@spark.apache.org
Subject: RE: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? your target 
release day for Spark3.3?

Sie erhalten nicht oft E-Mail von 
"liuj...@cn.ibm.com<mailto:liuj...@cn.ibm.com>". Weitere Informationen, warum 
dies wichtig ist<http://aka.ms/LearnAboutSenderIdentification>
hi, Meikel, would you pls help to add both of us 
(t...@ibm.com<mailto:t...@ibm.com>, 
liuj...@cn.ibm.com<mailto:liuj...@cn.ibm.com>) to mailing lists: 
user@spark.apache.org<mailto:user@spark.apache.org> ? thanks!
Juan Liu (刘娟) PMP®
Release Manager, Watson Health, China Development Lab
Email: liuj...@cn.ibm.com<mailto:liuj...@cn.ibm.com>
Mobile: 86-13521258532





From:"Bode, Meikel, NM-X-DS" 
mailto:meikel.b...@bertelsmann.de>>
To:"Theodore J Griesenbrock" mailto:t...@ibm.com>>, 
"sro...@gmail.com<mailto:sro...@gmail.com>" 
mailto:sro...@gmail.com>>
Cc:"Juan Liu" mailto:liuj...@cn.ibm.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Date:2022/01/20 03:05 PM
Subject:[EXTERNAL] RE: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and 
how? your target release day for Spark3.3?




Hi, New releases are announced via mailing lists 
user@spark.apache.org<mailto:user@spark.apache.org> & 
d...@spark.apache.org<mailto:d...@spark.apache.org>. Best, Meikel From: 
Theodore J Griesenbrock mailto:t...@ibm.com>> Sent: Mittwoch, 19. 
ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender

This message came from outside your organization.

ZjQcmQRYFpfptBannerEnd

Hi,



New releases are announced via mailing lists 
user@spark.apache.org<mailto:user@spark.apache.org>& 
d...@spark.apache.org<mailto:d...@spark.apache.org>.



Best,

Meikel



From:Theodore J Griesenbrock mailto:t...@ibm.com>>
Sent: Mittwoch, 19. Januar 2022 18:50
To: sro...@gmail.com<mailto:sro...@gmail.com>
Cc: Juan Liu mailto:liuj...@cn.ibm.com>>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: RE: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? your target 
release day for Spark3.3?



Sie erhalten nicht oft E-Mail von "t...@ibm.com<mailto:t...@ibm.com>". Weitere 
Informationen, warum dies wichtig 
ist<http://aka.ms/LearnAboutSenderIdentification>

Again, sorry to bother you.



What is the best option available to ensure we get notified when a new version 
is released for Apache Spark?  I do not see any RSS feeds, nor do I see any 
e-mail subscription option for this page:  
https://spark.apache.org/news/index.html<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fnews%2Findex.html=04%7C01%7CMeikel.Bode%40Bertelsmann.de%7Cebcdc2fa4d024e3886e708d9dbf110ed%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637782650703280903%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=8Y8ihxZl9ox%2F7EcFuDeLA0y3W0UOzVagr1dPZFuVDoA%3D=0>



Please let me know what we can do to ensure we stay up to date with the news.



Thanks!



-T.J.





T.J. Griesenbrock

Technical Release Manager

Watson Health

He/Him/His



+1 (602) 377-7673 (Text only)
t...@ibm.com<mailto:t...@ibm.com>

IBM





- Original message -
From: "Sean Owen" mailto:sro...@gmail.com>>
To: "Juan Liu" mailto:liuj...@cn.ibm.com>>
Cc: "Theodore J Griesenbrock" mailto:t...@ibm.com>>, "User" 
mailto:user@spark.apache.org>>
Subject: [EXTERNAL] Re: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? 
your target release day for Spark3.3?
Date: Thu, Jan 13, 2022 08:05

Yes, Spark does not use the SocketServer mentioned in CVE-2019-17571, however, 
so is not affected.

3.3.0 would probably be out in a couple months.



On Thu, Jan 13, 2022 at 3:14 AM Juan Liu 
mailto:liuj...@cn.ibm.com>> wrote:

We are info

RE: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? your target release day for Spark3.3?

2022-01-19 Thread Bode, Meikel, NM-X-DS
Hi,

New releases are announced via mailing lists 
user@spark.apache.org & 
d...@spark.apache.org.

Best,
Meikel

From: Theodore J Griesenbrock 
Sent: Mittwoch, 19. Januar 2022 18:50
To: sro...@gmail.com
Cc: Juan Liu ; user@spark.apache.org
Subject: RE: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? your target 
release day for Spark3.3?

Sie erhalten nicht oft E-Mail von "t...@ibm.com". Weitere 
Informationen, warum dies wichtig 
ist
Again, sorry to bother you.

What is the best option available to ensure we get notified when a new version 
is released for Apache Spark?  I do not see any RSS feeds, nor do I see any 
e-mail subscription option for this page:  
https://spark.apache.org/news/index.html

Please let me know what we can do to ensure we stay up to date with the news.

Thanks!

-T.J.


T.J. Griesenbrock
Technical Release Manager
Watson Health
He/Him/His

+1 (602) 377-7673 (Text only)
t...@ibm.com

IBM


- Original message -
From: "Sean Owen" mailto:sro...@gmail.com>>
To: "Juan Liu" mailto:liuj...@cn.ibm.com>>
Cc: "Theodore J Griesenbrock" mailto:t...@ibm.com>>, "User" 
mailto:user@spark.apache.org>>
Subject: [EXTERNAL] Re: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? 
your target release day for Spark3.3?
Date: Thu, Jan 13, 2022 08:05

Yes, Spark does not use the SocketServer mentioned in CVE-2019-17571, however, 
so is not affected.
3.3.0 would probably be out in a couple months.

On Thu, Jan 13, 2022 at 3:14 AM Juan Liu 
mailto:liuj...@cn.ibm.com>> wrote:
We are informed that CVE-2021-4104 is not only problem with Log4J 1.x. There is 
one more CVE-2019-17571, and as Apache announced EOL in 2015, so Spark 3.3.0 
will be very expected. Do you think middle 2022 is a reasonable time for Spark 
3.3.0 release?

Juan Liu (刘娟) PMP®




Release Management, Watson Health, China Development Lab
Email: liuj...@cn.ibm.com
Phone: 86-10-82452506













- To 
unsubscribe e-mail: 
user-unsubscr...@spark.apache.org


RE: Conda Python Env in K8S

2021-12-06 Thread Bode, Meikel, NMA-CFD
Hi Mich,

Thanks for your response. Yes -py-files options works. I also tested it.
The question is why the -archives option doesn't?

>From Jira I can see that it should be available since 3.1.0:

https://issues.apache.org/jira/browse/SPARK-33530
https://issues.apache.org/jira/browse/SPARK-33615

Best,
Meikel


From: Mich Talebzadeh 
Sent: Samstag, 4. Dezember 2021 18:36
To: Bode, Meikel, NMA-CFD 
Cc: dev ; user@spark.apache.org
Subject: Re: Conda Python Env in K8S



Hi Meikel



In the past I tried with


   --py-files hdfs://$HDFS_HOST:$HDFS_PORT/minikube/codes/DSBQ.zip \
   --archives 
hdfs://$HDFS_HOST:$HDFS_PORT/minikube/codes/pyspark_venv.zip#pyspark_venv \


which is basically what you are doing. the first line --py-files works but the 
second one fails



It tried to unpack them ? It tries to unpack them



Unpacking an archive 
hdfs://50.140.197.220:9000/minikube/codes/pyspark_venv.zip#pyspark_venv<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2F50.140.197.220%3A9000%2Fminikube%2Fcodes%2Fpyspark_venv.zip%23pyspark_venv=04%7C01%7CMeikel.Bode%40bertelsmann.de%7Cf9716ed642fe4c92be6f08d9b74c98bd%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637742362326413635%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=UinOKIfYC16iRnLiibB9kXsvoiEZ10DfVzHlKqJZTHk%3D=0>
 from /tmp/spark-502a5b57-0fe6-45bd-867d-9738e678e9a3/pyspark_venv.zip to 
/opt/spark/work-dir/./pyspark_venv



But it failed.



This could be due to creating the virtual environment inside the docker in the 
work-dir or sometimes when there is not enough available memory to gunzip and 
untar the file, especially if your executors are built on cluster nodes with 
less memory than the driver node.



However, The most convenient way to add additional packages to the docker image 
is to add them directly to the docker image at time of creating the image. So 
external packages are bundled as a part of my docker image because it is fixed 
and if an application requires those set of dependencies every time, they are 
there. Also note that every time you put RUN statement it creates an 
intermediate container and hence it increases build time. So reduce it as 
follows

RUN pip install pyyaml numpy cx_Oracle --no-cache-dir

The --no-cheche-dir option to pip is to prevent the downloaded binaries from 
being added to the image, reducing the image size. It is also advisable to 
install all packages in one line. Every time you put RUN statement it creates 
an intermediate container and hence it increases the build time. So reduce it 
by putting all packages in one line.

Log in to the docker image and check for Python packages installed

docker run -u 0 -it 
spark/spark-py:3.1.1-scala_2.12-8-jre-slim-buster_java8PlusPackages bash

root@5bc049af7278:/opt/spark/work-dir# pip list

PackageVersion

-- ---

cx-Oracle  8.3.0

numpy  1.21.4

pip21.3.1

PyYAML 6.0

setuptools 59.4.0

wheel  0.34.2
HTH

 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.linkedin.com%2Fin%2Fmich-talebzadeh-ph-d-5205b2%2F=04%7C01%7CMeikel.Bode%40bertelsmann.de%7Cf9716ed642fe4c92be6f08d9b74c98bd%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637742362326413635%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=i0NSWMcUHWBNBMV2Qe%2BejnJyFSNfGQkEs9KMh0OS5uY%3D=0>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Sat, 4 Dec 2021 at 07:52, Bode, Meikel, NMA-CFD 
mailto:meikel.b...@bertelsmann.de>> wrote:
Hi Mich,

sure thats possible. But distributing the complete env would be more practical.
A workaround at the moment is, that we build different environments and store 
them in a pv and then we mount it into the pods and refer from the 
SparkApplication resource to the desired env..

But actually these options exist and I want to understand what the issue is...
Any hints on that?

Best,
Meikel

From: Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>
Sent: Freitag, 3. Dezember 2021 13:27
To: Bode, Meikel, NMA-CFD 
mailto:meikel.b...@bertelsmann.de>>
Cc: dev mailto:d...@spark.apache.org>>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Conda Python Env in K8S

Build python packages into the docker image itself first with pip install

RUN pip install panda . . -no-cache

HTH

On Fri, 3 Dec 2021 at 11:58, Bode, Meikel, NMA-CFD 
mailto:meikel.b...@bertelsmann.de>> wrote:
Hello,

I am trying to run s

RE: Conda Python Env in K8S

2021-12-03 Thread Bode, Meikel, NMA-CFD
Hi Mich,

sure thats possible. But distributing the complete env would be more practical.
A workaround at the moment is, that we build different environments and store 
them in a pv and then we mount it into the pods and refer from the 
SparkApplication resource to the desired env..

But actually these options exist and I want to understand what the issue is...
Any hints on that?

Best,
Meikel

From: Mich Talebzadeh 
Sent: Freitag, 3. Dezember 2021 13:27
To: Bode, Meikel, NMA-CFD 
Cc: dev ; user@spark.apache.org
Subject: Re: Conda Python Env in K8S

Build python packages into the docker image itself first with pip install

RUN pip install panda . . -no-cache

HTH

On Fri, 3 Dec 2021 at 11:58, Bode, Meikel, NMA-CFD 
mailto:meikel.b...@bertelsmann.de>> wrote:
Hello,

I am trying to run spark jobs using Spark Kubernetes Operator.
But when I try to bundle a conda python environment using the following 
resource description the python interpreter is only unpack to the driver and 
not to the executors.

apiVersion: 
"sparkoperator.k8s.io/v1beta2<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fsparkoperator.k8s.io%2Fv1beta2=04%7C01%7CMeikel.Bode%40bertelsmann.de%7Cb5110ae39caf431d2dbb08d9b65ac233%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637741323186317880%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=yMQzpK2mKyoEThxxOakqJJmV7JbbX14nW4w46pZk3KQ%3D=0>"
kind: SparkApplication
metadata:
  name: ...
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  mainApplicationFile: local:///path/script.py
..
  sparkConf:
"spark.archives": "local:///path/conda-env.tar.gz#environment"
"spark.pyspark.python": "./environment/bin/python"
"spark.pyspark.driver.python": "./environment/bin/python"


The driver is unpacking the archive and the python scripts gets executed.
On executors there is no log message indicating that the archive gets unpacked.
Executors then fail as they cant find the python executable at the given 
location "./environment/bin/python".

Any hint?

Best,
Meikel
--




 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.linkedin.com%2Fin%2Fmich-talebzadeh-ph-d-5205b2%2F=04%7C01%7CMeikel.Bode%40bertelsmann.de%7Cb5110ae39caf431d2dbb08d9b65ac233%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637741323186327824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=KyGbLLVXpJxSumXg7GHnYIYiP2J7q%2Fe4UJJWJefjAnI%3D=0>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




Conda Python Env in K8S

2021-12-03 Thread Bode, Meikel, NMA-CFD
Hello,

I am trying to run spark jobs using Spark Kubernetes Operator.
But when I try to bundle a conda python environment using the following 
resource description the python interpreter is only unpack to the driver and 
not to the executors.

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: ...
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  mainApplicationFile: local:///path/script.py
..
  sparkConf:
"spark.archives": "local:///path/conda-env.tar.gz#environment"
"spark.pyspark.python": "./environment/bin/python"
"spark.pyspark.driver.python": "./environment/bin/python"


The driver is unpacking the archive and the python scripts gets executed.
On executors there is no log message indicating that the archive gets unpacked.
Executors then fail as they cant find the python executable at the given 
location "./environment/bin/python".

Any hint?

Best,
Meikel


RE: [issue] not able to add external libs to pyspark job while using spark-submit

2021-11-24 Thread Bode, Meikel, NMA-CFD
Can we add Python dependencies as we can do for mvn coordinates? So that we run 
sth like pip install  or download from pypi index?

From: Mich Talebzadeh 
Sent: Mittwoch, 24. November 2021 18:28
Cc: user@spark.apache.org
Subject: Re: [issue] not able to add external libs to pyspark job while using 
spark-submit

The easiest way to set this up is to create dependencies.zip file.

Assuming that you have a virtual environment already set-up, where there is 
directory called site-packages, go to that directory and just create a minimal 
a shell script  say package_and_zip_dependencies.sh to do it for you

Example:

cat package_and_zip_dependencies.sh

#!/bin/bash
# 
https://blog.danielcorin.com/posts/2015-11-09-pyspark/
zip -r ../dependencies.zip .
ls -l ../dependencies.zip
exit 0

One created, create an environment variable called DEPENDENCIES

export DEPENDENCIES="export 
DEPENDENCIES="/usr/src/Python-3.7.3/airflow_virtualenv/lib/python3.7/dependencies.zip"

Then in spark-submit you can do this

spark-submit --master yarn --deploy-mode client --driver-memory xG 
--executor-memory yG --num-executors m --executor-cores n --py-files 
$DEPENDENCIES --jars $HOME/jars/spark-sql-kafka-0-10_2.12-3.1.0.jar

Also check this link as well  
https://blog.danielcorin.com/posts/2015-11-09-pyspark/

HTH



 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Wed, 24 Nov 2021 at 14:03, Atheer Alabdullatif 
mailto:a.alabdulla...@lean.sa>> wrote:
Dear Spark team,
hope my email finds you well



I am using pyspark 3.0 and facing an issue with adding external library 
[configparser] while running the job using [spark-submit] & [yarn]

issue:



import configparser

ImportError: No module named configparser

21/11/24 08:54:38 INFO util.ShutdownHookManager: Shutdown hook called

solutions I tried:

1- installing library src files and adding it to the session using [addPyFile]:

  *   files structure:

-- main dir

   -- subdir

  -- libs

 -- configparser-5.1.0

-- src

   -- configparser.py

 -- configparser.zip

  -- sparkjob.py

1.a zip file:

spark = SparkSession.builder.appName(jobname + '_' + table).config(

"spark.mongodb.input.uri", uri +

"." +

table +

"").config(

"spark.mongodb.input.sampleSize",

990).getOrCreate()



spark.sparkContext.addPyFile('/maindir/subdir/libs/configparser.zip')

df = spark.read.format("mongo").load()

1.b python file

spark = SparkSession.builder.appName(jobname + '_' + table).config(

"spark.mongodb.input.uri", uri +

"." +

table +

"").config(

"spark.mongodb.input.sampleSize",

990).getOrCreate()



spark.sparkContext.addPyFile('maindir/subdir/libs/configparser-5.1.0/src/configparser.py')

df = spark.read.format("mongo").load()



2- using os library

def install_libs():

'''

this function used to install external python libs in yarn

'''

os.system("pip3 install configparser")



if __name__ == "__main__":



# install libs

install_libs()



we value your support

best,

Atheer Alabdullatif



*إشعار السرية وإخلاء المسؤولية*
هذه الرسالة ومرفقاتها معدة لاستخدام المُرسل إليه المقصود بالرسالة فقط وقد تحتوي 
على معلومات سرية أو محمية قانونياً، إن لم تكن الشخص المقصود فنرجو إخطار المُرسل 
فوراً عن طريق الرد على 

RE: HiveThrift2 ACID Transactions?

2021-11-11 Thread Bode, Meikel, NMA-CFD
Hi all,

I now have some more input related to the issues I face at the moment:

When I try to UPDATE an external table via JDBC connection to HiveThrift2 
server I get the following exception:

java.lang.UnsupportedOperationException: UPDATE TABLE is not supported 
temporarily.

Whey doing an DELETE I see:

org.apache.spark.sql.AnalysisException: DELETE is only supported with v2 tables.

INSERT is working as expected.

We are using Spark 3.1.2 with Hadoop 3.2.0 and an external Hive 3.0.0 metastore 
on K8S.
Warehouse dir is located at AWS s3 attached using protocol s3a.

I learned so far that  that we need to use an ACID compatible file format for 
external tables such as ORC order DELTA.
In addition to that we would need to set some ACID related properties either as 
first commands after session creation or via appropriate configuration files:

SET hive.support.concurrency=true;
SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.enforce.sorting=true;
SET hive.enforce.bucketing=true;
SET hive.exec.dynamic.partition.mode=nostrict;
SET hive.compactor.initiator.on=true;
SET hive.compactor.worker.threads=1;

Now, when I try to create the following table:

create external table acidtab (id string, val string)
stored as ORC location '/data/acidtab.orc'
tblproperties ('transactional'='true');

I see the following exception:

org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:The 
table must be stored using an ACID compliant format (such as ORC): 
default.acidtab)

Even when I try to create the file in ORC format the exception makes the 
suggestion to use ORC as it is required for ACID compliance.

Another point is that external tables are not getting deleted via DROP TABLE 
command. The only are being removed from the metastore but they remain 
physically available at their s3 bucket.

I tried with:

SET `hive.metastore.thrift.delete-files-on-drop`=true;

And also by setting:

TBLPROPERTIES ('external.table.purge'='true')


Any help on these issues would be very appreciated!

Many thanks,
Meikel Bode

From: Bode, Meikel, NMA-CFD 
Sent: Mittwoch, 10. November 2021 08:23
To: user ; dev 
Subject: HiveThrift2 ACID Transactions?

Hi all,

We want to use apply INSERTS, UPDATE, and DELETE operations on tables based on 
parquet or ORC files served by thrift2.
Actually its unclear whether we can enable them and where.

At the moment, when executing UPDATE or DELETE operations those are getting 
blocked.

Anyone out who uses ACID transactions in combination with thrift2?

Best,
Meikel


HiveThrift2 ACID Transactions?

2021-11-09 Thread Bode, Meikel, NMA-CFD
Hi all,

We want to use apply INSERTS, UPDATE, and DELETE operations on tables based on 
parquet or ORC files served by thrift2.
Actually its unclear whether we can enable them and where.

At the moment, when executing UPDATE or DELETE operations those are getting 
blocked.

Anyone out who uses ACID transactions in combination with thrift2?

Best,
Meikel


3.1.2 Executor Initialization fails due to dep copy failure

2021-11-08 Thread Bode, Meikel, NMA-CFD
Hi all,

I try to get Thrift2 on Spark 3.1.2 running on K8S with one executor for the 
moment. This works so far but it fails at executor side during initialization.
The issue seems to be related to access restrictions on certain directories... 
But I am not sure. Please see errors marked in yellow.

The AWS-SDK depency was provided via "-jars" option as "-packages" option 
failed as I was unable to get ivy using "/opt/spark/work-dir" nor "/tmp" as 
cache dir:

#  - --packages
#  - 
com.amazonaws:aws-java-sdk-s3:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0
  - --jars
  - 
local:///opt/spark/jars/aws-java-sdk-1.11.375.jar,local:///opt/spark/jars/hadoop-aws-3.2.0.jar


Executor Log:
++ id -u
+ myuid=185
++ id -g
+ mygid=0
+ set +e
++ getent passwd 185
+ uidentry=
+ set -e
+ '[' -z '' ']'
+ '[' -w /etc/passwd ']'
+ echo '185:x:185:0:anonymous uid:/opt/spark:/bin/false'
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ sort -t_ -k4 -n
+ readarray -t SPARK_EXECUTOR_JAVA_OPTS
+ '[' -n '' ']'
+ '[' -z ']'
+ '[' -z ']'
+ '[' -n /opt/hadoop ']'
+ '[' -z '' ']'
++ /opt/hadoop/bin/hadoop classpath
+ export 
'SPARK_DIST_CLASSPATH=/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/share/hadoop/yarn:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/*'
+ 
SPARK_DIST_CLASSPATH='/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/share/hadoop/yarn:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/*'
+ '[' -z ']'
+ '[' -z x ']'
+ SPARK_CLASSPATH='/opt/spark/conf::/opt/spark/jars/*'
+ case "$1" in
+ shift 1
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" 
-Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp 
"$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" 
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url 
$SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores 
$SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname 
$SPARK_EXECUTOR_POD_IP --resourceProfileId $SPARK_RESOURCE_PROFILE_ID)
+ exec /usr/bin/tini -s -- /usr/local/openjdk-8/bin/java 
-Dspark.hadoop.hive.server2.thrift.port=1 -Dspark.driver.port= 
-Dspark.driver.blockManager.port= -Xms1024m -Xmx1024m -cp 
'/opt/spark/conf::/opt/spark/jars/*:/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/share/hadoop/yarn:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/*'
 org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url 
spark://CoarseGrainedScheduler@spark-thrift-server-internal: --executor-id 
24 --cores 1 --app-id spark-application-1636134793639 --hostname 10.1.16.66 
--resourceProfileId 0
log4j:ERROR setFile(null,true) call failed.
java.io.FileNotFoundException: ./fairscheduler-statedump.log (Permission denied)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.(FileOutputStream.java:213)
at java.io.FileOutputStream.(FileOutputStream.java:133)
at org.apache.log4j.FileAppender.setFile(FileAppender.java:294)
at 
org.apache.log4j.RollingFileAppender.setFile(RollingFileAppender.java:207)
at org.apache.log4j.FileAppender.activateOptions(FileAppender.java:165)
at 
org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
at 
org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
at 
org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
at 
org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
at 
org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
at 
org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)
at 
org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)
at 
org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
at 
org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
at org.apache.log4j.LogManager.(LogManager.java:127)
at org.slf4j.impl.Log4jLoggerFactory.(Log4jLoggerFactory.java:66)
at 

RE: [ANNOUNCE] Apache Spark 3.2.0

2021-10-19 Thread Bode, Meikel, NMA-CFD
Many thanks! 

From: Gengliang Wang 
Sent: Dienstag, 19. Oktober 2021 16:16
To: dev ; user 
Subject: [ANNOUNCE] Apache Spark 3.2.0

Hi all,

Apache Spark 3.2.0 is the third release of the 3.x line. With tremendous 
contribution from the open-source community, this release managed to resolve in 
excess of 1,700 Jira tickets.

We'd like to thank our contributors and users for their contributions and early 
feedback to this release. This release would not have been possible without you.

To download Spark 3.2.0, head over to the download page: 
https://spark.apache.org/downloads.html

To view the release notes: 
https://spark.apache.org/releases/spark-release-3-2-0.html


RE: spark thrift server as hive on spark running on kubernetes, and more.

2021-09-10 Thread Bode, Meikel, NMA-CFD
Hi,

thx. Great work. Will test it 

Best,
Meikel Bode

From: Kidong Lee 
Sent: Freitag, 10. September 2021 01:39
To: user@spark.apache.org
Subject: spark thrift server as hive on spark running on kubernetes, and more.

Hi,

Recently, I have open-sourced a tool called 
DataRoaster(https://github.com/cloudcheflabs/dataroaster)
 to provide data platforms running on kubernetes with ease.
In particular, with DataRoaster, you can deploy spark thrift server on 
kubernetes easily, which is originated from my blog of 
https://itnext.io/hive-on-spark-in-kubernetes-115c8e9fa5c1.
In addition to spark thrift server as hive on spark, there are several 
components provided by DataRoaster, for instance, hive metastore, trino, 
redash, jupyterhub, kafka.

To use DataRoaster,
- visit 
https://github.com/cloudcheflabs/dataroaster
 .
- see also demo 
https://github.com/cloudcheflabs/dataroaster#dataroaster-demo

Thank you.

- Kidong Lee.


RE: K8S submit client vs. cluster

2021-08-12 Thread Bode, Meikel, NMA-CFD
On EKS...

From: Mich Talebzadeh 
Sent: Donnerstag, 12. August 2021 15:47
To: Bode, Meikel, NMA-CFD 
Cc: user@spark.apache.org
Subject: Re: K8S submit client vs. cluster

Ok

As I see it with PySpark even if it is submitted as cluster, it will be 
converted to client mode anyway


Are you running this on AWS or GCP?


 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.linkedin.com%2Fin%2Fmich-talebzadeh-ph-d-5205b2%2F=04%7C01%7C%7Cc589602079b34630fe7f08d95d97ae9f%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637643728318918233%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=vEl8zDS%2BZC2NvHbw7qKCts2ry6ouJ%2BzHTjXMik6rw3M%3D=0>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 12 Aug 2021 at 12:42, Bode, Meikel, NMA-CFD 
mailto:meikel.b...@bertelsmann.de>> wrote:
Hi Mich,

All PySpark.

Best,
Meikel

From: Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>
Sent: Donnerstag, 12. August 2021 13:41
To: Bode, Meikel, NMA-CFD 
mailto:meikel.b...@bertelsmann.de>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: K8S submit client vs. cluster

Is this Spark or PySpark?





 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.linkedin.com%2Fin%2Fmich-talebzadeh-ph-d-5205b2%2F=04%7C01%7C%7Cc589602079b34630fe7f08d95d97ae9f%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637643728318918233%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=vEl8zDS%2BZC2NvHbw7qKCts2ry6ouJ%2BzHTjXMik6rw3M%3D=0>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 12 Aug 2021 at 12:35, Bode, Meikel, NMA-CFD 
mailto:meikel.b...@bertelsmann.de>> wrote:
Hi all,

If we schedule a spark job on k8s, how are volume mappings handled?

In client mode I would expect that drivers volumes have to mapped manually in 
the pod template. Executor volumes are attached dynamically based on submit 
parameters. Right...?

I cluster mode I would expect that volumes for drivers/executors are taken from 
submit command and attached to the pods accordingly. Right...?

Any hints appreciated,

Best,
Meikel


RE: K8S submit client vs. cluster

2021-08-12 Thread Bode, Meikel, NMA-CFD
Hi Mich,

All PySpark.

Best,
Meikel

From: Mich Talebzadeh 
Sent: Donnerstag, 12. August 2021 13:41
To: Bode, Meikel, NMA-CFD 
Cc: user@spark.apache.org
Subject: Re: K8S submit client vs. cluster

Is this Spark or PySpark?





 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.linkedin.com%2Fin%2Fmich-talebzadeh-ph-d-5205b2%2F=04%7C01%7C%7Cfa2ebcafde7841ce513708d95d860a55%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637643652541525851%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=3dd3pwdbBc97OpUmhTZrqfMtuKaFUeio3BGfJurl1Ss%3D=0>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 12 Aug 2021 at 12:35, Bode, Meikel, NMA-CFD 
mailto:meikel.b...@bertelsmann.de>> wrote:
Hi all,

If we schedule a spark job on k8s, how are volume mappings handled?

In client mode I would expect that drivers volumes have to mapped manually in 
the pod template. Executor volumes are attached dynamically based on submit 
parameters. Right...?

I cluster mode I would expect that volumes for drivers/executors are taken from 
submit command and attached to the pods accordingly. Right...?

Any hints appreciated,

Best,
Meikel


K8S submit client vs. cluster

2021-08-12 Thread Bode, Meikel, NMA-CFD
Hi all,

If we schedule a spark job on k8s, how are volume mappings handled?

In client mode I would expect that drivers volumes have to mapped manually in 
the pod template. Executor volumes are attached dynamically based on submit 
parameters. Right...?

I cluster mode I would expect that volumes for drivers/executors are taken from 
submit command and attached to the pods accordingly. Right...?

Any hints appreciated,

Best,
Meikel


Parquet Metadata

2021-06-23 Thread Bode, Meikel, NMA-CFD
Hi folks,

Maybe not the right audience but maybe you came along such an requirement.
Is it possible to define a parquet schema, that contains technical column names 
and a list of translations for a certain column name into different languages?

I give an example:
Technical: "custnr" would translate to { EN:"Customer Number",  DE: 
"Kundennummer"}

We could of course deliver a meta data file containing such language mappings, 
but our questions is whether we could embed that info into the parquet meta 
data?

Thanks a lot,
Meikel



DF blank value fill

2021-05-21 Thread Bode, Meikel, NMA-CFD
Hi all,

My df looks like follows:

Situation:
MainKey, SubKey, Val1, Val2, Val3, ...
1, 2, a, null, c
1, 2, null, null, c
1, 3, null, b, null
1, 3, a, null, c


Desired outcome:
1, 2, a, b, c
1, 2, a, b, c
1, 3, a, b, c
1, 3, a, b, c


How could I populate/synchronize empty cells of all records with the same 
combination of MainKey and SubKey with the respective value of other rows with 
the same key combination?
A certain value, if not null, of a col is guaranteed to be unique within the 
df. If a col exists then there is at least one row with a not-null value.

I am using pyspark.

Thanks for any hint,
Best
Meikel


RE: Thrift2 Server on Kubernetes?

2021-05-16 Thread Bode, Meikel, NMA-CFD
Hi Kidong Lee,

Thank you for your email. Actually I came along your blog and it seems to be 
very complete.
As you write, that its not easy to bring Spark Thrift2 to K8S and because you 
had to write your own wrapper, I have the impression that is not really 
officially supported, despite the fact that it works 
My question aims more on official support as with Spark 3.1.1 we are now 
officially on K8S. 
Do you have any info on that?

Thanks and all the best,
Meikel

PS: I will give your solution a try anyway 

-Original Message-
From: mykidong  
Sent: Freitag, 14. Mai 2021 14:12
To: user@spark.apache.org
Subject: Re: Thrift2 Server on Kubernetes?

Hi Meikel,

If you want to run Spark Thrift Server on Kubernetes, take a look at my blog
post: 
https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fitnext.io%2Fhive-on-spark-in-kubernetes-115c8e9fa5c1data=04%7C01%7C%7C03424ca170bb4da1c3c908d916d184e1%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637565911984441872%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C2000sdata=CdkMZSbbQa6Pbr1eU5fkxNebHMDQPB27BSzr7w%2F8unU%3Dreserved=0

Cheers,

- Kidong Lee.



--
Sent from: 
https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fapache-spark-user-list.1001560.n3.nabble.com%2Fdata=04%7C01%7C%7C03424ca170bb4da1c3c908d916d184e1%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637565911984441872%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C2000sdata=ICU36nDl8ID9WHeM1dBCKS1f%2FOUgodlENaUH2uBe5cs%3Dreserved=0

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Thrift2 Server on Kubernetes?

2021-05-14 Thread Bode, Meikel, NMA-CFD
Hi all,

We migrate to k8s and I wonder whether there are already "good practices" to 
run thrift2 on k8s?

Best,
Meikel


Broadcast Variable

2021-05-03 Thread Bode, Meikel, NMA-CFD
Hi all,

when broadcasting a large dict containing several million entries to executors 
what exactly happens when calling bc_var.value within a UDF like:

..
d = bc_var.value
..

Does d receives a copy of the dict inside value or is this handled like a 
pointer?

Thanks,
Meikel


RE: Recursive Queries or Recursive UDF?

2021-05-01 Thread Bode, Meikel, NMA-CFD
Hi all,

I created a running example of my data set and I describe what I want to 
achieve. The idea is to create a view over the resulting table and use it for 
later joins.
Instead if applying a UDF to a column using a dict with 20+ (growing) million 
records.

Example data set:

spark.createDataFrame(
[
("inquiry1", "quotation1"),

("inquiry2", "quotation2"),
("quotation2", "order2"),
("order2", "invoice2"),

("order3", "invoice3")
],
['parent', 'child']
).createOrReplaceTempView("hierarchy")

We see several hierarchies in the df above but we don’t have records indicating 
that e.g. inquiry1 is the root of one of the hierarchies.
So we have:

1: inquiry1 > quotation1
2: inquiry2 > quotation2 > order2
3: order3 > invoice3

What I need is the following. For every child I need the level 0 parent like 
this:

child, lvl-0-parent
quotation1, inquiry1
quotation2, inquiry2
order2, inquiry2
invoice2, inquiry2
invoice3, order3

It would be perfect to see that some of the entries actually are the root by 
indicating:
child, lvl-0-parent
inquiry1, null
inquiry2, null
order3, null

Actually that’s what I realized with my recursive UDF I put into the initial 
post.

Thank you for any hints on that issue! Any hints on the UDF solution are also 
very welcome:

Thx and best,
Meikel

From: Bode, Meikel, NMA-CFD
Sent: Freitag, 30. April 2021 12:16
To: user @spark 
Subject: Recursive Queries or Recursive UDF?

Hi all,

I implemented a recursive UDF, that tries to find a document number in a long 
list of predecessor documents. This can be a multi-level hierarchy:
C is successor of B is successor of A (but many more levels are possible)

As input to that UDF I prepare a dict that contains the complete document flow 
reduced to the required fields to follow the path back to the originating 
document.
The dict is broadcasted and then used  by the UDF. Actually this approach is 
very slow and now – as data growth – it kills my executors regularly so that 
RDDs get lost and task fail. Sometimes also the workers (docker containers) 
become unresponsive and are getting killed.

Here is the coding of the methods:

1.: Prepare and define the UDF, broadcast dict.

# Define function for recursive lookup of root document
def __gen_caseid_udf_sales_document_flow(self):
global bc_document_flow, udf_sales_document_flow

# Prepare docflow for broadcasting by only selecting required fields
df_vbfa_subset = self.spark.table("FLOWTABLE").select("clnt", 
"predecessor_head", "predecessor_item", "doc_num", "doc_item")

# Prepare dictionary for broadcast
document_flow_dic = {}
for clnt, predecessor_head, predecessor_item, doc_num, doc_item in 
df_subset.rdd.collect():
document_flow_dic[(clnt, doc_num, doc_item)] = predecessor_head, 
predecessor_item

# Broadcast dictionary to workers
bc_document_flow = self.spark.sparkContext.broadcast(document_flow_dic)

# Register new user defined function UDF
udf_vbfa_sales_document_flow = func.udf(gen_caseid_udf_sale_root_lookup)


2.: The recursive function used in the UDF
# Find root document
def gen_caseid_udf_sale_get_root_doc(lt, clnt, docnr, posnr):
global bc_document_flow

if not clnt or not docnr or not posnr:
return None, None

key = clnt, docnr, posnr

if key in lt:
docnr_tmp, item_tmp = lt[key]
if docnr_tmp == docnr and item_tmp == posnr:
return docnr, posnr
else:
return gen_caseid_udf_sale_get_root_doc(lt, clnt, docnr_tmp, 
item_tmp)
else:
return docnr, posnr

3: The UDF
# Define udf function to look up root document
def gen_caseid_udf_sale_root_lookup(clnt, doc_num, posnr):
global bc_document_flow # Name of the broad cast variable

lt = bc_document_flow.value
h, p = gen_caseid_udf_vbfa_sale_get_root_doc(lt, clnt, doc_num, posnr)
return str(clnt) + str(h) + str(p)

##
4. Usage of the UDF on a DF that might contain several ten thousands of rows:

# Lookup root document from document flow
documents = documents.withColumn("root_doc", 
udf_sales_document_flow(func.col('clnt'),
 
func.col('document_number’),
 
func.col('item_number')))

Do you have any hint on my coding or are there any ideas how to implement a 
recursive select without implement a potential unoptimizable UDF?
I came along 
https://en.wikipedia.org/wiki/Hierarchical_and_recursive_queries_in_SQL which 
might an option, does Spark support this kind of construct?

Thanks and all the best,
Meikel



Recursive Queries or Recursive UDF?

2021-04-30 Thread Bode, Meikel, NMA-CFD
Hi all,

I implemented a recursive UDF, that tries to find a document number in a long 
list of predecessor documents. This can be a multi-level hierarchy:
C is successor of B is successor of A (but many more levels are possible)

As input to that UDF I prepare a dict that contains the complete document flow 
reduced to the required fields to follow the path back to the originating 
document.
The dict is broadcasted and then used  by the UDF. Actually this approach is 
very slow and now - as data growth - it kills my executors regularly so that 
RDDs get lost and task fail. Sometimes also the workers (docker containers) 
become unresponsive and are getting killed.

Here is the coding of the methods:

1.: Prepare and define the UDF, broadcast dict.

# Define function for recursive lookup of root document
def __gen_caseid_udf_sales_document_flow(self):
global bc_document_flow, udf_sales_document_flow

# Prepare docflow for broadcasting by only selecting required fields
df_vbfa_subset = self.spark.table("FLOWTABLE").select("clnt", 
"predecessor_head", "predecessor_item", "doc_num", "doc_item")

# Prepare dictionary for broadcast
document_flow_dic = {}
for clnt, predecessor_head, predecessor_item, doc_num, doc_item in 
df_subset.rdd.collect():
document_flow_dic[(clnt, doc_num, doc_item)] = predecessor_head, 
predecessor_item

# Broadcast dictionary to workers
bc_document_flow = self.spark.sparkContext.broadcast(document_flow_dic)

# Register new user defined function UDF
udf_vbfa_sales_document_flow = func.udf(gen_caseid_udf_sale_root_lookup)


2.: The recursive function used in the UDF
# Find root document
def gen_caseid_udf_sale_get_root_doc(lt, clnt, docnr, posnr):
global bc_document_flow

if not clnt or not docnr or not posnr:
return None, None

key = clnt, docnr, posnr

if key in lt:
docnr_tmp, item_tmp = lt[key]
if docnr_tmp == docnr and item_tmp == posnr:
return docnr, posnr
else:
return gen_caseid_udf_sale_get_root_doc(lt, clnt, docnr_tmp, 
item_tmp)
else:
return docnr, posnr

3: The UDF
# Define udf function to look up root document
def gen_caseid_udf_sale_root_lookup(clnt, doc_num, posnr):
global bc_document_flow # Name of the broad cast variable

lt = bc_document_flow.value
h, p = gen_caseid_udf_vbfa_sale_get_root_doc(lt, clnt, doc_num, posnr)
return str(clnt) + str(h) + str(p)

##
4. Usage of the UDF on a DF that might contain several ten thousands of rows:

# Lookup root document from document flow
documents = documents.withColumn("root_doc", 
udf_sales_document_flow(func.col('clnt'),
 
func.col('document_number'),
 
func.col('item_number')))

Do you have any hint on my coding or are there any ideas how to implement a 
recursive select without implement a potential unoptimizable UDF?
I came along 
https://en.wikipedia.org/wiki/Hierarchical_and_recursive_queries_in_SQL which 
might an option, does Spark support this kind of construct?

Thanks and all the best,
Meikel



AW: [ANNOUNCE] Announcing Apache Spark 3.1.1

2021-03-02 Thread Bode, Meikel, NMA-CFD
Congrats!

Von: Hyukjin Kwon 
Gesendet: Mittwoch, 3. März 2021 02:41
An: user @spark ; dev 
Betreff: [ANNOUNCE] Announcing Apache Spark 3.1.1

We are excited to announce Spark 3.1.1 today.

Apache Spark 3.1.1 is the second release of the 3.x line. This release adds
Python type annotations and Python dependency management support as part of 
Project Zen.
Other major updates include improved ANSI SQL compliance support, history 
server support
in structured streaming, the general availability (GA) of Kubernetes and node 
decommissioning
in Kubernetes and Standalone. In addition, this release continues to focus on 
usability, stability,
and polish while resolving around 1500 tickets.

We'd like to thank our contributors and users for their contributions and early 
feedback to
this release. This release would not have been possible without you.

To download Spark 3.1.1, head over to the download page:
http://spark.apache.org/downloads.html

To view the release notes:
https://spark.apache.org/releases/spark-release-3-1-1.html



AW: Issue after change to 3.0.2

2021-02-26 Thread Bode, Meikel, NMA-CFD
Hi Sean.

You are right. So we are using docker images for our spark cluster. The 
generation of the worker image did no succeed and therefore the old 3.0.1 image 
was still in use.

Thanks,
Best,
Meikel

Von: Sean Owen 
Gesendet: Freitag, 26. Februar 2021 10:29
An: Bode, Meikel, NMA-CFD 
Cc: user @spark 
Betreff: Re: Issue after change to 3.0.2

That looks to me like you have two different versions of Spark in use somewhere 
here. Like the cluster and driver versions aren't quite the same. Check your 
classpaths?

On Fri, Feb 26, 2021 at 2:53 AM Bode, Meikel, NMA-CFD 
mailto:meikel.b...@bertelsmann.de>> wrote:
Hi All,

After changing to 3.0.2 I face the following issue. Thanks for any hint on that 
issue.

Best,
Meikel

   df = self.spark.read.json(path_in)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 300, 
in json
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 
1304, in __call__
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 128, in 
deco
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, 
in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o76.json.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 
14, 192.168.1.6, executor 0): java.io.InvalidClassException: 
org.apache.spark.broadcast.TorrentBroadcast; local class incompatible: stream 
classdesc serialVersionUID = 4804550167553929379, local class serialVersionUID 
= 3291767831129286585
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:407)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2209)
at 
org.apache.spark.sql.catalyst.json.JsonInferSchema.infer(JsonInferSchema.scala:94)
at 
org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.$anonfun$inferFromDataset$5(JsonDataSource.scala:110)
at 
org.apache.spark.sql

Issue after change to 3.0.2

2021-02-26 Thread Bode, Meikel, NMA-CFD
Hi All,

After changing to 3.0.2 I face the following issue. Thanks for any hint on that 
issue.

Best,
Meikel

   df = self.spark.read.json(path_in)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 300, 
in json
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 
1304, in __call__
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 128, in 
deco
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, 
in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o76.json.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 
14, 192.168.1.6, executor 0): java.io.InvalidClassException: 
org.apache.spark.broadcast.TorrentBroadcast; local class incompatible: stream 
classdesc serialVersionUID = 4804550167553929379, local class serialVersionUID 
= 3291767831129286585
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:407)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2209)
at 
org.apache.spark.sql.catalyst.json.JsonInferSchema.infer(JsonInferSchema.scala:94)
at 
org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.$anonfun$inferFromDataset$5(JsonDataSource.scala:110)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at 
org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.inferFromDataset(JsonDataSource.scala:110)
at 
org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.infer(JsonDataSource.scala:99)
at 
org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:65)
at 
org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:61)
at 
org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:208)
at 

Strange behavior with "bigger" JSON file

2021-01-28 Thread Bode, Meikel, NMA-CFD
Hi all,

I process a lot of JSON files of different sizes. All files share the same 
overall structure. I have no issue with files of sizes around 150-300MB.
Another file of around 530MB now causes errors when I apply selectExpr on the 
resulting DF after reading the file.

AnalysisException: cannot resolve '`entity`' given input columns: 
[_corrupt_record]; line 1 pos 6;
'Project ['LOWER('entity) AS entity#668391, 'extraction_model, 'part, 
'pipeline_run_id, 'timestamp]
+- Relation[_corrupt_record#668389] json

The schema of the read DF looks like:


root

 |-- _corrupt_record: string (nullable = true)

I analyzed the file and it contains all requested columns. Actually I can fully 
parse it using ijson, as I thought the issue might relate syntax errors.

Any hints on the _corrupt_record?

Thanks
Meikel