Hi Dani,

Plugins need to be placed in a folder inside the plugins directory, I think
that might be the problem.

Best regards,

Martijn

On Sun, Jul 9, 2023 at 7:00 PM Dániel Pálma <dani...@gmail.com> wrote:

> Thanks for the tips Martijn!
>
> I've fixed the library versions to 1.16 everywhere and also decided to
> scrap pyflink and go for the sql-client instead to keep things simpler for
> now.
>
> This is the Dockerfile I am using for both the *jobmanager* and the
> *sql-client*
>
> FROM flink:1.16.2-scala_2.12-java11
>
> RUN APACHE_HADOOP_URL=https://archive.apache.org/dist/hadoop/ \
> && HADOOP_VERSION=3.3.5 \
> && wget 
> ${APACHE_HADOOP_URL}/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz
> \
> && tar xzvf hadoop-${HADOOP_VERSION}.tar.gz \
> && HADOOP_HOME=`pwd`/hadoop-${HADOOP_VERSION}
>
> ENV HADOOP_CLASSPATH /opt/flink/hadoop-3.3.5/etc/hadoop:/opt/flink/
> hadoop-3.3.5/share/hadoop/common/lib/*:/opt/flink/hadoop-3.3.5/share/
> hadoop/common/*:/opt/flink/hadoop-3.3.5/share/hadoop/hdfs:/opt/flink/
> hadoop-3.3.5/share/hadoop/hdfs/lib/*:/opt/flink/hadoop-3.3.5/share/hadoop/
> hdfs/*:/opt/flink/hadoop-3.3.5/share/hadoop/mapreduce/*:/opt/flink/
> hadoop-3.3.5/share/hadoop/yarn:/opt/flink/hadoop-3.3.5/share/hadoop/yarn/
> lib/*:/opt/flink/hadoop-3.3.5/share/hadoop/yarn/*
>
> COPY lib/flink-json-1.16.1.jar /opt/flink/lib/
> COPY lib/flink-sql-connector-hive-3.1.2_2.12-1.16.2.jar /opt/flink/lib/
> COPY lib/flink-sql-connector-kafka-1.16.2.jar /opt/flink/lib/
> COPY lib/iceberg-flink-runtime-1.16-1.3.0.jar /opt/flink/lib/
> COPY lib/iceberg-hive-runtime-1.3.0.jar /opt/flink/lib/
> COPY lib/hive-metastore-3.1.3.jar /opt/flink/lib/
> COPY lib/hadoop-aws-3.3.5.jar /opt/flink/lib/
> COPY lib/aws-java-sdk-bundle-1.12.316.jar /opt/flink/lib/
>
> COPY lib/flink-s3-fs-hadoop-1.16.1.jar /opt/flink/plugins/
>
> WORKDIR /opt/flink
>
> I start the sql-client via */opt/flink/bin/sql-client.sh embedded*
>
> I am able to create a sink table with the iceberg connector using the
> following query:
>
> create table if not exists clicks_ib
> (
> `timestamp` STRING,
> event STRING,
> user_id STRING,
> site_id STRING,
> url STRING,
> on_site_seconds INT,
> viewed_percent INT
> )
> with ( 'connector'='iceberg',
> 'catalog-name'='hive_catalog',
> 'uri'='thrift://hivemetastore:9083',
> 'warehouse'='s3a://iceberg');
>
> But when I try to select from it, I run into the following error:
>
> *Flink SQL> select * from default_catalog.default_database.clicks_ib;*
>
>
> *[ERROR] Could not execute SQL statement.
> Reason:org.apache.hadoop.hive.metastore.api.MetaException:
> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
> org.apache.hadoop.fs.s3a.S3AFileSystem not found*
>
> I feel like there is still a little confusion in me on where to place what
> jars, but not exactly sure what is missing.
>
> For reference, I'll paste the current full docker-compose.yml below.
>
> version: "3.7"
> services:
>
> sqlclient:
> container_name: sqlclient
> build: .
> command:
> - /opt/flink/bin/sql-client.sh
> - embedded
> depends_on:
> - jobmanager
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: jobmanager
> rest.address: jobmanager
> volumes:
> - ./flink-sql:/etc/sql
>
> jobmanager:
> build: .
> hostname: "jobmanager"
> container_name: "jobmanager"
> expose:
> - "6123"
> ports:
> - "8081:8081"
> command: jobmanager
> environment:
> - JOB_MANAGER_RPC_ADDRESS=jobmanager
> - AWS_ACCESS_KEY_ID=minio
> - AWS_SECRET_ACCESS_KEY=minio123
> - AWS_REGION=us-east-1
>
> taskmanager:
> image: flink:1.16.2-scala_2.12-java11
> hostname: "taskmanager"
> container_name: "taskmanager"
> expose:
> - "6121"
> - "6122"
> depends_on:
> - jobmanager
> command: taskmanager
> links:
> - jobmanager:jobmanager
> environment:
> - JOB_MANAGER_RPC_ADDRESS=jobmanager
> - AWS_ACCESS_KEY_ID=minio
> - AWS_SECRET_ACCESS_KEY=minio123
> - AWS_REGION=us-east-1
>
> mariadb:
> image: 'mariadb:latest'
> hostname: mariadb
> container_name: mariadb
> ports:
> - '3306:3306'
> environment:
> MYSQL_ROOT_PASSWORD: admin
> MYSQL_USER: admin
> MYSQL_PASSWORD: admin
> MYSQL_DATABASE: metastore_db
> volumes:
> - ./mariadb-data:/var/lib/mysql
>
> hivemetastore:
> hostname: hivemetastore
> container_name: hivemetastore
> build:
> context: hive
> ports:
> - '9083:9083'
> environment:
> METASTORE_DB_HOSTNAME: mariadb
> depends_on:
> - mariadb
>
> minio:
> hostname: "minio"
> image: "minio/minio:latest"
> container_name: "minio"
> ports:
> - "9001:9001"
> - "9000:9000"
> command:
> - "server"
> - "/data"
> - "--console-address"
> - ":9001"
> volumes:
> - "minio:/data"
> environment:
> MINIO_ROOT_USER: "minio"
> MINIO_ROOT_PASSWORD: "minio123"
> networks:
> default:
> aliases:
> - iceberg.minio
>
> mc:
> depends_on:
> - "minio"
> image: "minio/mc"
> container_name: "mc"
> entrypoint: >
> /bin/sh -c "
> until (/usr/bin/mc config host add minio http://minio:9000 minio
> minio123) do echo "...waiting..." && sleep 1; done;
> /usr/bin/mc rm -r --force minio/iceberg;
> /usr/bin/mc mb minio/iceberg;
> /usr/bin/mc policy set public minio/iceberg;
> tail -f /dev/null
> "
>
> broker:
> image: confluentinc/cp-kafka:7.4.0
> hostname: broker
> container_name: broker
> depends_on:
> - controller
> ports:
> - "9092:9092"
> - "9101:9101"
> environment:
> KAFKA_NODE_ID: 1
> KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
> 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
> KAFKA_ADVERTISED_LISTENERS:
> 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
> KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
> KAFKA_JMX_PORT: 9101
> KAFKA_JMX_HOSTNAME: localhost
> KAFKA_PROCESS_ROLES: 'broker'
> KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
> KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092'
> KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
> KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
> KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
> # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh
> random-uuid"
> # See
> https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
> CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
>
> controller:
> image: confluentinc/cp-kafka:7.4.0
> hostname: controller
> container_name: controller
> ports:
> - "9093:9093"
> - "9102:9102"
> environment:
> KAFKA_NODE_ID: 2
> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
> KAFKA_JMX_PORT: 9102
> KAFKA_JMX_HOSTNAME: localhost
> KAFKA_PROCESS_ROLES: 'controller'
> KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
> KAFKA_LISTENERS: 'CONTROLLER://controller:9093'
> KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
> KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
> KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs'
> # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh
> random-uuid"
> # See
> https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
> CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
>
> # schema-registry:
> # image: confluentinc/cp-schema-registry:7.4.0
> # hostname: schema-registry
> # container_name: schema-registry
> # depends_on:
> # - broker
> # - controller
> # ports:
> # - "8081:8081"
> # environment:
> # SCHEMA_REGISTRY_HOST_NAME: schema-registry
> # SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
> # SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
>
> control-center:
> image: confluentinc/cp-enterprise-control-center:7.4.0
> hostname: control-center
> container_name: control-center
> depends_on:
> - broker
> - controller
> # - schema-registry
> ports:
> - "9021:9021"
> environment:
> CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
> # CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081";
> CONTROL_CENTER_REPLICATION_FACTOR: 1
> CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
> CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
> CONFLUENT_METRICS_TOPIC_REPLICATION: 1
> PORT: 9021
>
> volumes:
> minio: null
>
> networks:
> default:
> name: flinkberg
>
> Best,
>
> Dani
>
> On Thu, Jun 29, 2023 at 9:06 AM Martijn Visser <martijnvis...@apache.org>
> wrote:
>
>> Hi Dani,
>>
>> There are two things that I notice:
>>
>> 1. You're mixing different Flink versions (1.16 and 1.17): all Flink
>> artifacts should be from the same Flink version
>> 2. S3 plugins need to be added to the plugins folder of Flink, because
>> they are loaded via the plugin mechanism. See
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/
>>
>> Best regards,
>>
>> Martijn
>>
>> On Sat, Jun 24, 2023 at 1:22 PM Dániel Pálma <dani...@gmail.com> wrote:
>>
>>> Hey folks,
>>>
>>> Nice to meet ya'll!
>>>
>>> I'm trying to get the following stack up and running locally:
>>>
>>> - Kafka as source
>>> - pyFlink SQL
>>> - Iceberg on top of MinIO
>>>
>>> The goal is to have a pyflink script that reads data from a Kafka topic,
>>> does some transformations, and dumps it into an iceberg table.
>>>
>>> I have everything, except for the pyflink app running in Docker, defined
>>> in a docker-compose.yml:
>>>
>>> version: "3.7"
>>> services:
>>>
>>> mariadb:
>>> image: 'mariadb:latest'
>>> hostname: mariadb
>>> container_name: mariadb
>>> ports:
>>> - '3306:3306'
>>> environment:
>>> MYSQL_ROOT_PASSWORD: admin
>>> MYSQL_USER: admin
>>> MYSQL_PASSWORD: admin
>>> MYSQL_DATABASE: metastore_db
>>> volumes:
>>> - ./mariadb-data:/var/lib/mysql
>>> networks:
>>> iceberg_net:
>>>
>>> hive-metastore:
>>> hostname: hive-metastore
>>> container_name: hive-metastore
>>> build:
>>> context: hive
>>> ports:
>>> - '9083:9083'
>>> environment:
>>> METASTORE_DB_HOSTNAME: mariadb
>>> depends_on:
>>> - mariadb
>>> networks:
>>> iceberg_net:
>>>
>>> minio:
>>> hostname: "minio"
>>> image: "minio/minio:latest"
>>> container_name: "minio"
>>> ports:
>>> - "9001:9001"
>>> - "9000:9000"
>>> command:
>>> - "server"
>>> - "/data"
>>> - "--console-address"
>>> - ":9001"
>>> volumes:
>>> - "minio:/data"
>>> environment:
>>> MINIO_ROOT_USER: "minio"
>>> MINIO_ROOT_PASSWORD: "minio123"
>>> networks:
>>> iceberg_net:
>>> aliases:
>>> - iceberg.minio
>>>
>>> mc:
>>> depends_on:
>>> - "minio"
>>> image: "minio/mc"
>>> container_name: "mc"
>>> entrypoint: >
>>> /bin/sh -c "
>>> until (/usr/bin/mc config host add minio http://minio:9000 minio
>>> minio123) do echo "...waiting..." && sleep 1; done;
>>> /usr/bin/mc rm -r --force minio/iceberg;
>>> /usr/bin/mc mb minio/iceberg;
>>> /usr/bin/mc policy set public minio/iceberg;
>>> tail -f /dev/null
>>> "
>>> networks:
>>> iceberg_net:
>>>
>>> broker:
>>> image: confluentinc/cp-kafka:7.4.0
>>> hostname: broker
>>> container_name: broker
>>> depends_on:
>>> - controller
>>> ports:
>>> - "9092:9092"
>>> - "9101:9101"
>>> environment:
>>> KAFKA_NODE_ID: 1
>>> KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
>>> 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
>>> KAFKA_ADVERTISED_LISTENERS:
>>> 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
>>> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
>>> KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
>>> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
>>> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
>>> KAFKA_JMX_PORT: 9101
>>> KAFKA_JMX_HOSTNAME: localhost
>>> KAFKA_PROCESS_ROLES: 'broker'
>>> KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
>>> KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092
>>> '
>>> KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
>>> KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
>>> KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
>>> # Replace CLUSTER_ID with a unique base64 UUID using
>>> "bin/kafka-storage.sh random-uuid"
>>> # See
>>> https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
>>> CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
>>> networks:
>>> iceberg_net:
>>>
>>> controller:
>>> image: confluentinc/cp-kafka:7.4.0
>>> hostname: controller
>>> container_name: controller
>>> ports:
>>> - "9093:9093"
>>> - "9102:9102"
>>> environment:
>>> KAFKA_NODE_ID: 2
>>> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
>>> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
>>> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
>>> KAFKA_JMX_PORT: 9102
>>> KAFKA_JMX_HOSTNAME: localhost
>>> KAFKA_PROCESS_ROLES: 'controller'
>>> KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
>>> KAFKA_LISTENERS: 'CONTROLLER://controller:9093'
>>> KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
>>> KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
>>> KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs'
>>> # Replace CLUSTER_ID with a unique base64 UUID using
>>> "bin/kafka-storage.sh random-uuid"
>>> # See
>>> https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
>>> CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
>>> networks:
>>> iceberg_net:
>>>
>>> schema-registry:
>>> image: confluentinc/cp-schema-registry:7.4.0
>>> hostname: schema-registry
>>> container_name: schema-registry
>>> depends_on:
>>> - broker
>>> - controller
>>> ports:
>>> - "8081:8081"
>>> environment:
>>> SCHEMA_REGISTRY_HOST_NAME: schema-registry
>>> SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
>>> SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
>>> networks:
>>> iceberg_net:
>>>
>>> control-center:
>>> image: confluentinc/cp-enterprise-control-center:7.4.0
>>> hostname: control-center
>>> container_name: control-center
>>> depends_on:
>>> - broker
>>> - controller
>>> - schema-registry
>>> ports:
>>> - "9021:9021"
>>> environment:
>>> CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
>>> CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081";
>>> CONTROL_CENTER_REPLICATION_FACTOR: 1
>>> CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
>>> CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
>>> CONFLUENT_METRICS_TOPIC_REPLICATION: 1
>>> PORT: 9021
>>> networks:
>>> iceberg_net:
>>>
>>> networks:
>>> iceberg_net:
>>>
>>> volumes:
>>> minio: null
>>>
>>>
>>> My PyFlink program looks like this currently:
>>>
>>> def demo():
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.set_parallelism(1)
>>> env.enable_checkpointing(1000)
>>>
>>> for jar in FLINK_JARS:
>>> print(f"Adding jar: {jar}")
>>> env.add_jars(f"file://{jar}")
>>>
>>> t_env = StreamTableEnvironment.create(stream_execution_environment=env)
>>> t_env.execute_sql("""
>>> CREATE CATALOG hive_catalog WITH (
>>> 'type'='iceberg',
>>> 'catalog-type'='hive',
>>> 'uri'='thrift://localhost:9083',
>>> 'warehouse'='s3a://iceberg'
>>> )
>>> """)
>>>
>>> t_env.use_catalog("hive_catalog")
>>>
>>> t_env.execute_sql("""
>>> create database if not exists iceberg
>>> """)
>>>
>>>
>>> These jars are added to the streaming environment:
>>>
>>> - aws-java-sdk-bundle-1.12.316.jar
>>> - flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar hive-metastore-3.1.3.jar
>>> - flink-s3-fs-hadoop-1.16.1.jar
>>> - flink-sql-connector-kafka-1.16.2.jar
>>> - iceberg-flink-runtime-1.16-1.3.0.jar
>>> - flink-sql-connector-hive-3.1.2_2.12-1.16.2.jar hadoop-aws-3.3.5.jar
>>> - iceberg-hive-runtime-1.3.0.jar
>>>
>>> The metastore operates with the following versions:
>>>
>>> HADOOP_VERSION=3.3.5
>>> METASTORE_VERSION=3.1.3
>>>
>>> And finally, this is the error I am currently running into:
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *pyflink.util.exceptions.TableException:
>>> org.apache.flink.table.api.TableException: Could not execute CREATE
>>> DATABASE: (catalogDatabase: [{}], catalogName: [hive_catalog],
>>> databaseName: [iceberg], ignoreIfExists: [true])        at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1125)
>>>       at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
>>>       at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)        at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>       at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at
>>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>>       at
>>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>>       at
>>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>>     at
>>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>>       at
>>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>       at
>>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>>       at java.base/java.lang.Thread.run(Thread.java:829)Caused by:
>>> java.lang.RuntimeException: Failed to create namespace iceberg in Hive
>>> Metastore        at
>>> org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:294)
>>>       at
>>> org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:230)
>>>       at
>>> org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:221)
>>>       at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1117)
>>>       ... 12 moreCaused by:
>>> MetaException(message:java.lang.RuntimeException:
>>> java.lang.ClassNotFoundException: Class
>>> org.apache.hadoop.fs.s3a.S3AFileSystem not found)        at
>>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39343)
>>>       at
>>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39311)
>>>       at
>>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result.read(ThriftHiveMetastore.java:39245)
>>>       at
>>> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
>>> at
>>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_create_database(ThriftHiveMetastore.java:1106)
>>>       at
>>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.create_database(ThriftHiveMetastore.java:1093)
>>>       at
>>> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:811)
>>>       at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)        at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>       at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at
>>> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208)
>>>       at com.sun.proxy.$Proxy36.createDatabase(Unknown Source)        at
>>> org.apache.iceberg.hive.HiveCatalog.lambda$createNamespace$8(HiveCatalog.java:283)
>>>       at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58)
>>>   at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
>>> at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
>>>       at
>>> org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:281)
>>>       ... 15 more*
>>>
>>> I also have the HADOOP classpath added to the environment before running
>>> the script via: *export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop
>>> classpath`*
>>> I'm not well versed in Java but as far as I understand including the
>>> `hadoop-aws` jar in the flink runtime should provide the necessary classes
>>> to run this example, right?
>>>
>>> I was looking for any pointers on where to go from here, or maybe
>>> existing examples of similar setups (I couldn't find any).
>>>
>>> Best,
>>>
>>> Dani
>>>
>>

Reply via email to