I finally managed to make it work following the advice of Robin Moffat who 
replied to the earlier email:

There's a lot of permutations that you've described, so it's hard to take one 
reproducible test case here to try and identify the error :)
It certainly looks JAR related. You could try adding 
hadoop-hdfs-client-3.3.4.jar to your Flink ./lib folder.

The other route I would go is look at a functioning Flink -> Iceberg/Nessie 
environment and work backwards from there. This looks like a good place to 
start: https://www.dremio.com/blog/using-flink-with-apache-iceberg-and-nessie/

I managed to follow the dremio blog and get itt to work. I'm sharing my 
dockerfile in case it might help anyone with a similar issue in the future:

FROM flink:1.18.0-scala_2.12

ARG DEBIAN_FRONTEND=noninteractive
COPY packages.txt .
RUN apt-get update && xargs -a packages.txt apt-get -y install && apt-get 
autoremove
RUN mkdir -p /var/run/vsftpd/empty/
ENV TZ=Europe/Madrid
ENV AWS_REGION=eu-west-1
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone

COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt


## RabbitMQ connector flink-sql-connector-rabbitmq-3.0.1-1.17.jar 
org/apache/flink/flink-sql-connector-rabbitmq/3.0.1-1.17
RUN curl -L 
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-rabbitmq/3.0.1-1.17/flink-sql-connector-rabbitmq-3.0.1-1.17.jar
 -o /opt/flink/lib/flink-sql-connector-rabbitmq-3.0.1-1.17.jar

## Iceberg Flink Library
RUN curl -L 
https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.16/1.3.1/iceberg-flink-runtime-1.16-1.3.1.jar
 -o /opt/flink/lib/iceberg-flink-runtime-1.16-1.3.1.jar

## Hive Flink Library
RUN curl -L 
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.9_2.12/1.16.1/flink-sql-connector-hive-2.3.9_2.12-1.16.1.jar
 -o /opt/flink/lib/flink-sql-connector-hive-2.3.9_2.12-1.16.1.jar

## Hadoop Common Classes
RUN curl -L 
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/2.8.3/hadoop-common-2.8.3.jar
 -o /opt/flink/lib/hadoop-common-2.8.3.jar

## Hadoop AWS Classes
RUN curl -L 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
 -o /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

## AWS Bundled Classes
RUN curl -L 
https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.20.18/bundle-2.20.18.jar
 -o /opt/flink/lib/bundle-2.20.18.jar


Where my requirements.txt looks like this:

apache-flink-libraries==1.18.0
apache-flink==1.18.0

And the packages.txt is:

curl
findutils
python3-pip
python3-requests
python3-software-properties
python-is-python3

I'm not sure I need all the Jars in the dockerfile but as they say if it ain't 
broke, don't fix it.
________________________________
From: Robert Prat <robert.p...@urbiotica.com>
Sent: Friday, April 12, 2024 3:45 PM
To: user@flink.apache.org <user@flink.apache.org>
Subject: Pyflink w Nessie and Iceberg in S3 Jars

Hi there,

For several days I have been trying to find the right configuration for my 
pipeline which roughly consists in the following schema 
RabbitMQ->PyFlink->Nessie/Iceberg/S3.

For what I am going to explain I have tried both locally and through the 
official Flink docker images.

I have tried several different flink versions, but for simplicity let's say I 
am using the apache-flink==1.18.0 version. So far I have been able to use the 
jar in org/apache/iceberg/iceberg-flink-runtime-1.18 to connect to RabbitMQ and 
obtain the data from some streams, so I'd say the source side is working.

After that I have been trying to find a way to send the data in those streams 
to Iceberg in S3 through Nessie Catalog which is the one I have working. I have 
been using this pipeline with both Spark and Trino for some time now so I know 
it is working. Now what I am "simply" trying to do is to use my already set up 
Nessie catalog through flink.

I have tried to connect both directly through the sql-client.sh in the bin of 
pyflink dir and through python as
    table_env.execute_sql(f"""
        CREATE CATALOG nessie WITH (
        'type'='iceberg',
        'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog',
        'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
        'uri'='http://mynessieip:mynessieport/api/v1',
        'ref'='main',
        'warehouse'='s3a://mys3warehouse',
        's3-access-key-id'='{USER_AWS}',
        's3-secret-access-key'='{KEY_AWS}',
        's3-path-style-access'='true',
        'client-region'='eu-west-1')""")

The Jars I have included  (One of the many combinations I've tried with no 
result) in my  pyflink/lib dir  (i also tried to add them with env.add_jars or 
--jarfile)   are:

  *   hadoop-aws-3.4.0.jar
  *
iceberg-flink-runtime-1.18-1.5.0.jar
  *
hadoop-client-3.4.0.jar
  *
hadoop-common-3.4.0.jar
  *
hadoop-hdfs-3.4.0.jar

Right now I am getting the following error message:

 py4j.protocol.Py4JJavaError: An error occurred while calling o56.executeSql.
: java.lang.NoClassDefFoundError: org/apache/hadoop/hdfs/HdfsConfiguration (...)
Caused by: java.lang.ClassNotFoundException: 
org.apache.hadoop.hdfs.HdfsConfiguration...


But I have gotten several different errors in all the different Jar 
combinations I have tried. So my request is, does anybody know if my problem is 
JAR related or if I am doing something else wrong? I would be immensely 
grateful if someone could guide me to the right steps to implement this 
pipeline.

Thanks:)


Reply via email to