petartushev opened a new issue, #11445:
URL: https://github.com/apache/iceberg/issues/11445
### Query engine
I'm using flink as my engine.
### Question
I have a java data streaming app that uses flink to process records from
kafka that I want to store in iceberg tables with minio as blob storage using a
REST iceberg catalog. I used part of the spark docker-compose from the iceberg
docs to set up my iceberg and minio services, and my own kafka and flink
services that are defined as:
```
version: "3"
services:
jobmanager:
image: flink:1.19.1-java11
expose:
- "6121"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:1.19.1-java11
expose:
- "6122"
- "6123"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
zookeeper:
image: wurstmeister/zookeeper:latest
expose:
- "2181"
kafka:
image: wurstmeister/kafka:2.13-2.8.1
depends_on:
- zookeeper
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS:
INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
rest:
image: tabulario/iceberg-rest
container_name: iceberg-rest
networks:
iceberg_net:
ports:
- 8181:8181
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
minio:
image: "minio/minio"
container_name: "minio"
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
iceberg_net:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
volumes:
- minio-data:/data
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
container_name: mc
networks:
iceberg_net:
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin
password) do echo 'Waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
networks:
iceberg_net:
volumes:
minio-data:
```
My flink streaming application is defined as follows:
```
package org.example;
import com.google.gson.Gson;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.iceberg.Table;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.hadoop.conf.Configuration;
import pojo.Transaction;
import timestamp_utils.TransactionWatermarkStrategy;
import java.util.HashMap;
import java.util.Map;
public class ProcessTransactions {
public static void main(String[] args) throws Exception {
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("transaction")
.setGroupId("transactions_group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(2000L);
env.enableCheckpointing(30000); // e.g., 60000 for every minute
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env);
tableEnvironment.executeSql(
"CREATE CATALOG bank_transactions_catalog WITH (" +
" 'type'='iceberg'," +
"
'catalog-impl'='org.apache.iceberg.rest.RESTCatalog'," +
" 'uri'='http://localhost:8181'," +
" 'warehouse'='s3://warehouse/'," +
" 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'," +
" 's3.endpoint'='http://localhost:9000'," +
" 's3.access-key-id'='admin'," +
" 's3.secret-access-key'='password'," +
" 's3.path-style-access'='true'" +
")"
);
DataStream<String> dataStream = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(), "Kafka source");
DataStream<Transaction> transactionDataStream = dataStream.map(value
->
new Gson().fromJson(value, Transaction.class))
.assignTimestampsAndWatermarks(new
TransactionWatermarkStrategy());
TableSchema tableSchema = TableSchema.builder()
.field("transactionId", DataTypes.STRING())
.field("sendingClientAccountNumber", DataTypes.STRING())
.field("receivingClientAccountNumber", DataTypes.STRING())
.field("amount", DataTypes.DOUBLE())
.build();
Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put("uri", "http://localhost:8181"); // REST
Catalog URI
catalogProperties.put("warehouse", "s3://warehouse/"); // S3
warehouse location
catalogProperties.put("io-impl", S3FileIO.class.getName()); // Use
S3 File IO
catalogProperties.put("s3.endpoint", "http://minio:9000"); // MinIO
endpoint
catalogProperties.put("s3.access-key", "admin"); // MinIO access key
catalogProperties.put("s3.secret-key", "password"); // MinIO secret
key
catalogProperties.put("s3.path-style-access", "true"); // Required
for MinIO
Configuration hadoopConf = new Configuration();
hadoopConf.set("fs.s3a.access.key", "admin");
hadoopConf.set("fs.s3a.secret.key", "password");
hadoopConf.set("fs.s3a.endpoint", "http://minio:9000");
hadoopConf.set("fs.s3a.path.style.access", "true");
CatalogLoader catalogLoader = CatalogLoader.custom(
"bank_transactions_catalog", // Name of the catalog
catalogProperties, // Catalog properties (e.g., S3 config)
hadoopConf, // Hadoop configuration
"org.apache.iceberg.rest.RESTCatalog" // Catalog
implementation class (REST)
);
tableEnvironment.executeSql(
"CREATE TABLE transactions (" +
"transactionId STRING NOT NULL, " +
"sendingClientAccountNumber STRING NOT NULL, " +
"receivingClientAccountNumber STRING NOT NULL, " +
"amount FLOAT NOT NULL " +
")" +
"WITH (" +
" 'connector' = 'iceberg', " +
" 'catalog-impl' =
'org.apache.iceberg.rest.RESTCatalog', " +
" 'catalog-name' = 'bank_transactions_catalog', " +
" 'database-name' = 'financial_transactions', " +
" 'table-name' = 'transactions', " +
" 'format' = 'parquet', " +
" 'warehouse' = 's3://warehouse/', " +
" 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
" +
" 'catalog-rest.endpoint' = 'http://localhost:8181',
" +
" 's3.endpoint' = 'http://minio:9000', " +
" 's3.access-key-id' = 'admin', " +
" 's3.secret-access-key' = 'password', " +
" 's3.path-style-access' = 'true' " +
")"
);
TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader,
TableIdentifier.of("financial_transactions", "transactions"));
DataStream<Row> rowDataStream = transactionDataStream.map(
transaction -> Row.of(transaction.getTransactionId(),
transaction.getSendingClientAccountNumber(),
transaction.getReceivingClientAccountNumber(),
transaction.getAmount())
);
FlinkSink.forRow(rowDataStream, tableSchema)
.tableLoader(tableLoader)
.overwrite(false)
.append();
try{
env.execute(ProcessTransactions.class.getName());
}
catch (Exception e){
e.printStackTrace();
}
}
}
```
If I run the application in debug mode, and I run the command:
`tableEnvironment.executeSql("SELECT * FROM transactions").collect()` I get the
error:
```
Unable to create a source for reading table
'bank_transactions_catalog.financial_transactions.transactions'.
Table options are:
'catalog-impl'='org.apache.iceberg.rest.RESTCatalog'
'catalog-name'='bank_transactions_catalog'
'catalog-rest.endpoint'='http://localhost:8181'
'connector'='iceberg'
'database-name'='financial_transactions'
'format'='parquet'
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'
's3.access-key-id'='admin'
's3.endpoint'='http://minio:9000'
's3.path-style-access'='true'
's3.secret-access-key'='******'
'table-name'='transactions'
'warehouse'='s3://warehouse/'
```
with the cause being: `java.lang.NullPointerException: Invalid uri for http
client: null`.
However, if the program is run without debug mode it gives me the error:
```
Exception in thread "main"
org.apache.iceberg.exceptions.NoSuchTableException: Table does not exist:
financial_transactions.transactions
```
This is a newbie problem, but for some time I cannot figure how to debug
this. Any help or general advice towards resolving this issue is welcome.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]