Hi Dani,

There are two things that I notice:

1. You're mixing different Flink versions (1.16 and 1.17): all Flink
artifacts should be from the same Flink version
2. S3 plugins need to be added to the plugins folder of Flink, because they
are loaded via the plugin mechanism. See
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/

Best regards,

Martijn

On Sat, Jun 24, 2023 at 1:22 PM Dániel Pálma <dani...@gmail.com> wrote:

> Hey folks,
>
> Nice to meet ya'll!
>
> I'm trying to get the following stack up and running locally:
>
> - Kafka as source
> - pyFlink SQL
> - Iceberg on top of MinIO
>
> The goal is to have a pyflink script that reads data from a Kafka topic,
> does some transformations, and dumps it into an iceberg table.
>
> I have everything, except for the pyflink app running in Docker, defined
> in a docker-compose.yml:
>
> version: "3.7"
> services:
>
> mariadb:
> image: 'mariadb:latest'
> hostname: mariadb
> container_name: mariadb
> ports:
> - '3306:3306'
> environment:
> MYSQL_ROOT_PASSWORD: admin
> MYSQL_USER: admin
> MYSQL_PASSWORD: admin
> MYSQL_DATABASE: metastore_db
> volumes:
> - ./mariadb-data:/var/lib/mysql
> networks:
> iceberg_net:
>
> hive-metastore:
> hostname: hive-metastore
> container_name: hive-metastore
> build:
> context: hive
> ports:
> - '9083:9083'
> environment:
> METASTORE_DB_HOSTNAME: mariadb
> depends_on:
> - mariadb
> networks:
> iceberg_net:
>
> minio:
> hostname: "minio"
> image: "minio/minio:latest"
> container_name: "minio"
> ports:
> - "9001:9001"
> - "9000:9000"
> command:
> - "server"
> - "/data"
> - "--console-address"
> - ":9001"
> volumes:
> - "minio:/data"
> environment:
> MINIO_ROOT_USER: "minio"
> MINIO_ROOT_PASSWORD: "minio123"
> networks:
> iceberg_net:
> aliases:
> - iceberg.minio
>
> mc:
> depends_on:
> - "minio"
> image: "minio/mc"
> container_name: "mc"
> entrypoint: >
> /bin/sh -c "
> until (/usr/bin/mc config host add minio http://minio:9000 minio
> minio123) do echo "...waiting..." && sleep 1; done;
> /usr/bin/mc rm -r --force minio/iceberg;
> /usr/bin/mc mb minio/iceberg;
> /usr/bin/mc policy set public minio/iceberg;
> tail -f /dev/null
> "
> networks:
> iceberg_net:
>
> broker:
> image: confluentinc/cp-kafka:7.4.0
> hostname: broker
> container_name: broker
> depends_on:
> - controller
> ports:
> - "9092:9092"
> - "9101:9101"
> environment:
> KAFKA_NODE_ID: 1
> KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
> 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
> KAFKA_ADVERTISED_LISTENERS:
> 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
> KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
> KAFKA_JMX_PORT: 9101
> KAFKA_JMX_HOSTNAME: localhost
> KAFKA_PROCESS_ROLES: 'broker'
> KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
> KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092'
> KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
> KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
> KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
> # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh
> random-uuid"
> # See
> https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
> CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
> networks:
> iceberg_net:
>
> controller:
> image: confluentinc/cp-kafka:7.4.0
> hostname: controller
> container_name: controller
> ports:
> - "9093:9093"
> - "9102:9102"
> environment:
> KAFKA_NODE_ID: 2
> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
> KAFKA_JMX_PORT: 9102
> KAFKA_JMX_HOSTNAME: localhost
> KAFKA_PROCESS_ROLES: 'controller'
> KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
> KAFKA_LISTENERS: 'CONTROLLER://controller:9093'
> KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
> KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
> KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs'
> # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh
> random-uuid"
> # See
> https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
> CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
> networks:
> iceberg_net:
>
> schema-registry:
> image: confluentinc/cp-schema-registry:7.4.0
> hostname: schema-registry
> container_name: schema-registry
> depends_on:
> - broker
> - controller
> ports:
> - "8081:8081"
> environment:
> SCHEMA_REGISTRY_HOST_NAME: schema-registry
> SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
> SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
> networks:
> iceberg_net:
>
> control-center:
> image: confluentinc/cp-enterprise-control-center:7.4.0
> hostname: control-center
> container_name: control-center
> depends_on:
> - broker
> - controller
> - schema-registry
> ports:
> - "9021:9021"
> environment:
> CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
> CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081";
> CONTROL_CENTER_REPLICATION_FACTOR: 1
> CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
> CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
> CONFLUENT_METRICS_TOPIC_REPLICATION: 1
> PORT: 9021
> networks:
> iceberg_net:
>
> networks:
> iceberg_net:
>
> volumes:
> minio: null
>
>
> My PyFlink program looks like this currently:
>
> def demo():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> env.enable_checkpointing(1000)
>
> for jar in FLINK_JARS:
> print(f"Adding jar: {jar}")
> env.add_jars(f"file://{jar}")
>
> t_env = StreamTableEnvironment.create(stream_execution_environment=env)
> t_env.execute_sql("""
> CREATE CATALOG hive_catalog WITH (
> 'type'='iceberg',
> 'catalog-type'='hive',
> 'uri'='thrift://localhost:9083',
> 'warehouse'='s3a://iceberg'
> )
> """)
>
> t_env.use_catalog("hive_catalog")
>
> t_env.execute_sql("""
> create database if not exists iceberg
> """)
>
>
> These jars are added to the streaming environment:
>
> - aws-java-sdk-bundle-1.12.316.jar
> - flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar hive-metastore-3.1.3.jar
> - flink-s3-fs-hadoop-1.16.1.jar
> - flink-sql-connector-kafka-1.16.2.jar
> - iceberg-flink-runtime-1.16-1.3.0.jar
> - flink-sql-connector-hive-3.1.2_2.12-1.16.2.jar hadoop-aws-3.3.5.jar
> - iceberg-hive-runtime-1.3.0.jar
>
> The metastore operates with the following versions:
>
> HADOOP_VERSION=3.3.5
> METASTORE_VERSION=3.1.3
>
> And finally, this is the error I am currently running into:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *pyflink.util.exceptions.TableException:
> org.apache.flink.table.api.TableException: Could not execute CREATE
> DATABASE: (catalogDatabase: [{}], catalogName: [hive_catalog],
> databaseName: [iceberg], ignoreIfExists: [true])        at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1125)
>       at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
>       at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)        at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>       at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>       at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>     at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>       at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>       at java.base/java.lang.Thread.run(Thread.java:829)Caused by:
> java.lang.RuntimeException: Failed to create namespace iceberg in Hive
> Metastore        at
> org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:294)
>       at
> org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:230)
>       at
> org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:221)
>       at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1117)
>       ... 12 moreCaused by:
> MetaException(message:java.lang.RuntimeException:
> java.lang.ClassNotFoundException: Class
> org.apache.hadoop.fs.s3a.S3AFileSystem not found)        at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39343)
>       at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39311)
>       at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result.read(ThriftHiveMetastore.java:39245)
>       at
> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
> at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_create_database(ThriftHiveMetastore.java:1106)
>       at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.create_database(ThriftHiveMetastore.java:1093)
>       at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:811)
>       at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)        at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208)
>       at com.sun.proxy.$Proxy36.createDatabase(Unknown Source)        at
> org.apache.iceberg.hive.HiveCatalog.lambda$createNamespace$8(HiveCatalog.java:283)
>       at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58)
>   at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
> at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
>       at
> org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:281)
>       ... 15 more*
>
> I also have the HADOOP classpath added to the environment before running
> the script via: *export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop
> classpath`*
> I'm not well versed in Java but as far as I understand including the
> `hadoop-aws` jar in the flink runtime should provide the necessary classes
> to run this example, right?
>
> I was looking for any pointers on where to go from here, or maybe existing
> examples of similar setups (I couldn't find any).
>
> Best,
>
> Dani
>

Reply via email to