Hi

We are running a Flink job that delivers Kafka data to an Iceberg table.
The job uses the *org.apache.iceberg.flink.CatalogLoader* and
*org.apache.iceberg.flink.TableLoader
*interfaces in combination with *org.apache.iceberg.flink.sink.FlinkSink *where
the catalog type is Hive.

We have had success in running multiple jobs to respective tables that are
stored in the same s3 bucket but recently, when attempting to write to
tables that are stored in separate s3 buckets, we have run into issues. The
first jobs submitted to the cluster run fine, however, when submitting more
jobs for sink tables with the same name (in separate database schemas and
s3 buckets), we run into a class cast exception as well as
an org.apache.hadoop.metrics2.MetricsException error stating: Metrics
source S3AMetrics{bucket-name} already exists!

Attached are both the error logs as well as the main code snippets and pom
files for better context. Any help would be greatly appreciated.

The Flink cluster version is 12.7 and we have enabled the
flink-s3-fs-hadoop jar
plugin so as to be able to write to s3 files.
ᐧ
java.lang.ClassCastException: org.apache.hadoop.fs.s3a.S3AStorageStatistics 
cannot be cast to org.apache.hadoop.fs.s3a.S3AStorageStatistics
        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.createStorageStatistics(S3AFileSystem.java:636)
        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initializeStatisticsBinding(S3AFileSystem.java:578)
        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:401)
        at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
        at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
        at org.apache.iceberg.hadoop.Util.getFs(Util.java:51)
        at 
org.apache.iceberg.hadoop.HadoopInputFile.fromLocation(HadoopInputFile.java:54)
        at 
org.apache.iceberg.hadoop.HadoopFileIO.newInputFile(HadoopFileIO.java:59)
        at 
org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:252)
        at 
org.apache.iceberg.BaseMetastoreTableOperations.lambda$refreshFromMetadataLocation$0(BaseMetastoreTableOperations.java:179)
        at 
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:405)
        at 
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:214)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:198)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:190)
        at 
org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:178)
        at 
org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:160)
        at 
org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:200)
        at 
org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:94)
        at 
org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:77)
        at 
org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:93)
        at 
org.apache.iceberg.flink.TableLoader$CatalogTableLoader.loadTable(TableLoader.java:113)
        at 
org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:125)
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:432)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
        at java.lang.Thread.run(Thread.java:748)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0";
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";>

   <!-- <repositories>
       <repository>
           <id>local-maven-repo</id>
           <url>file://${project.basedir}/local-maven-repo</url>
       </repository>
   </repositories> -->

    <modelVersion>4.0.0</modelVersion>

    <groupId>org.matrix</groupId>
    <artifactId>Java</artifactId>
    <version>1.0</version>

    <packaging>jar</packaging>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
        <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
        <maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
        <maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
        <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.vintage</groupId>
            <artifactId>junit-vintage-engine</artifactId>
            <version>5.7.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-runtime</artifactId>
            <version>0.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapred</artifactId>
            <version>0.22.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>3.3.1</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.5</version>
        </dependency>

        <dependency>
            <groupId>org.javatuples</groupId>
            <artifactId>javatuples</artifactId>
            <version>1.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.12.5</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>1.12.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.12.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.12.5</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>1.12.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime-blink_2.11</artifactId>
            <version>1.12.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-uber_2.11</artifactId>
            <version>1.12.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-uber-blink_2.11</artifactId>
            <version>1.12.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.12.5</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.12.5</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.3.1</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven-compiler-plugin.version}</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>${maven-surefire-plugin.version}</version>
                <configuration>
                    <parallel>all</parallel>
                    <threadCount>4</threadCount>
                    <redirectTestOutputToFile>true</redirectTestOutputToFile>
                </configuration>
                <dependencies>
                    <dependency>
                        <groupId>org.apache.maven.surefire</groupId>
                        <artifactId>surefire-junit47</artifactId>
                        <version>${maven-surefire-plugin.version}</version>
                    </dependency>
                </dependencies>
            </plugin>

            <!-- Ensure that the Maven jar plugin runs before the Maven
              shade plugin by listing the plugin higher within the file. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>${maven-jar-plugin.version}</version>
            </plugin>

            <!--
              Configures `mvn package` to produce a bundled jar ("fat jar") for runners
              that require this for job submission to a cluster.
            -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>${maven-shade-plugin.version}</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <finalName>${project.artifactId}-bundled-${project.version}</finalName>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/LICENSE</exclude>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.codehaus.mojo</groupId>
                    <artifactId>exec-maven-plugin</artifactId>
                    <version>${maven-exec-plugin.version}</version>
                    <configuration>
                        <cleanupDaemonThreads>false</cleanupDaemonThreads>
                        <mainClass>Main</mainClass>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

</project>
import Models.IcebergJob;
import org.apache.commons.cli.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.hadoop.conf.Configuration;

import javax.xml.crypto.Data;
import java.util.*;
import java.util.logging.Logger;

public class Main {
    public enum Environment {
        DEV,
        CLOUDKARAFKA,
        PROD
    }

    static KafkaConfig kafkaConfig = new KafkaConfig(Environment.PROD);
    static Logger log = Logger.getLogger(Main.class.getName());

    public static void runStreamer(IcebergSchema icebergSchema, IcebergJob job) {
        final Configuration hadoopConf = new Configuration();
        hadoopConf.addResource(new Path("/opt/flink", "hive-site.xml"));

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(180000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new FsStateBackend("s3a://" + System.getenv("S3_BUCKET") + "/flink-checkpoints/backend"));

        System.out.println("Environment started....");

        Map<String, String> properties = new HashMap<String, String>();
        properties.put("uri", "thrift://hivemetastore-hive-metastore:9083");
        properties.put("warehouse", 
            "s3a://" + System.getenv("S3_BUCKET") + "/" + 
            System.getenv("S3_PIPELINE_DATALAKE_DIR") 
        );
        properties.put("clients", "2");
        properties.put("catalog-type", "hive");
        System.out.println("warehouse = "+properties.get("warehouse"));
        System.out.println("Starting catalog loader");

//        List<String> topics = new ArrayList<>();
//        topics.add(job.topic);

        try {
            final OutputTag<Row> exceptionTableTag = new OutputTag<Row>("exception-sideout"){};

            SingleOutputStreamOperator<Row> passThroughStream =
                CustomStreamer.getStreamer(job.account+"."+job.topic, env, kafkaConfig.kafkaProperties, Time.milliseconds(job.windowPeriod), job.schema, exceptionTableTag);

            if (icebergSchema != null) {
                CatalogLoader catalogLoader = CatalogLoader.hive(job.database_schema+"."+job.topic, hadoopConf, properties);
                
                System.out.println("Identifier name = "+TableIdentifier.parse(job.database_schema+"."+job.topic).name());
                System.out.println("ToString name = "+TableIdentifier.parse(job.database_schema+"."+job.topic).toString());
                CustomStreamer.mainStreamer(passThroughStream, catalogLoader,TableIdentifier.parse(job.database_schema+"."+job.topic), icebergSchema);
                String exceptionTopic = (System.getenv("EXCEPTION_TABLE_NAME") != null) ? System.getenv("EXCEPTION_TABLE_NAME") : Constants.PIPELINE_NAME;
                String exceptionSchemaName = (System.getenv("EXCEPTION_SCHEMA_NAME") != null) ? System.getenv("EXCEPTION_SCHEMA_NAME") : job.account.replace("-","_")+"_"+Constants.EXCEPTION_SCHEMA_NAME;
                
                System.out.println("Identifier name = "+TableIdentifier.parse(exceptionSchemaName+'.'+exceptionTopic).name());
                System.out.println("ToString name = "+TableIdentifier.parse(exceptionSchemaName+'.'+exceptionTopic).toString());
                CustomStreamer.exceptionStreamer(
                    exceptionTopic, 
                    passThroughStream.getSideOutput(exceptionTableTag), 
                    catalogLoader, 
                    TableIdentifier.parse(exceptionSchemaName+'.'+exceptionTopic)
                );

                env.execute(Constants.PIPELINE_NAME+"."+job.account+"."+job.topic);
            }
        } catch (Exception exception) {
            exception.printStackTrace();
        }
    }

    public static void main(String[] args) throws ParseException {
        CommandLineParser parser = new DefaultParser();

        log.info("Running version:" + Constants.VERSION); // TODO update version with build number

        Options options = new Options();
        options.addOption("account", true, "Commander Account for which the job will run");
        options.addOption("topic", true, "Kafka topic name / iceberg table name");
        options.addOption("schema", true, "Iceberg table schema");
        options.addOption("database_schema", true, "Iceberg catalog name");
        options.addOption("window_period", true, "Window period of tumbling window (ms)");

        CommandLine cmd = parser.parse(options, args);
        IcebergJob icebergJob = new IcebergJob(cmd);

        IcebergSchema icebergSchema = IcebergHelper.getSchemaFromJobSubmissionAPI(icebergJob, true);
        runStreamer(icebergSchema, icebergJob);
    }
}
import Models.IcebergJob;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.javatuples.Pair;
import org.json.JSONException;
import org.json.JSONObject;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.util.*;

import java.time.Duration;
import java.util.logging.Logger;

public class CustomStreamer {
    static Logger log = Logger.getLogger(CustomStreamer.class.getName());


    public static void exceptionStreamer(String exceptionTopicName, DataStream<Row> stream, CatalogLoader catalogLoader, TableIdentifier id) {
        IcebergJob exceptionOutput = new IcebergJob();
        exceptionOutput.topic = exceptionTopicName;
        exceptionOutput.schema = Constants.EXCEPTION_TOPIC_SCHEMA;
        IcebergSchema schema = IcebergHelper.getSchemaFromJobSubmissionAPI(exceptionOutput, false);
        TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, id);

        TableSchema.Builder schemaBuilder = TableSchema.builder().fields(schema.Columns, schema.DataTypes);
        FlinkSink.forRow(stream, schemaBuilder.build())
                .tableLoader(tableLoader)
                .build();
    }

    public static void mainStreamer(DataStream<Row> stream, CatalogLoader catalogLoader, TableIdentifier id, IcebergSchema schema) {
        TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, id);

        TableSchema.Builder schemaBuilder = TableSchema.builder().fields(schema.Columns, schema.DataTypes);
        FlinkSink.forRow(stream, schemaBuilder.build())
                .tableLoader(tableLoader)
                .build();
    }


}

Reply via email to