[ https://issues.apache.org/jira/browse/FLINK-28337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17561310#comment-17561310 ]
Martijn Visser commented on FLINK-28337: ---------------------------------------- [~migowei] The Flink project doesn't support Iceberg; are you using https://github.com/apache/iceberg/tree/master/flink ? > java.lang.IllegalArgumentException: Table identifier not set > ------------------------------------------------------------ > > Key: FLINK-28337 > URL: https://issues.apache.org/jira/browse/FLINK-28337 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive > Affects Versions: 1.14.2 > Environment: Flink 1.14.2 > Hive 3.1.2 > Iceberg 0.12.1 > Hadoop 3.2.1 > Reporter: wei > Priority: Major > > I use Flink Table SDK to select iceberg table. Set hivecatalog to > usercatalog, but looks like the default_catalog is still used. > The error message is as flollows: > {code:java} > 0:42:41,886 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl > [] - s3a-file-system metrics system started > 10:42:44,392 INFO org.apache.iceberg.BaseMetastoreCatalog > [] - Table loaded by catalog: > default_iceberg.s3a_flink.icebergtbcloudtrackingtest > 10:42:44,397 INFO org.apache.iceberg.mr.hive.HiveIcebergSerDe > [] - Using schema from existing table > {"type":"struct","schema-id":0,"fields":[{"id":1,"name":"vin","required":true,"type":"string"},{"id":2,"name":"name","required":true,"type":"string"},{"id":3,"name":"uuid","required":false,"type":"string"},{"id":4,"name":"channel","required":true,"type":"string"},{"id":5,"name":"run_scene","required":true,"type":"string"},{"id":6,"name":"timestamp","required":true,"type":"timestamp"},{"id":7,"name":"rcv_timestamp","required":true,"type":"timestamp"},{"id":8,"name":"raw","required":true,"type":"string"}]} > 10:42:44,832 INFO org.apache.iceberg.BaseMetastoreTableOperations > [] - Refreshing table metadata from new version: > s3a://warehouse/s3a_flink.db/icebergTBCloudTrackingTest/metadata/00011-8d1ef9f1-8172-49fd-b0de-58196642b662.metadata.json > 10:42:44,866 INFO org.apache.iceberg.BaseMetastoreCatalog > [] - Table loaded by catalog: > default_iceberg.s3a_flink.icebergtbcloudtrackingtest > 10:42:44,867 INFO org.apache.iceberg.mr.hive.HiveIcebergSerDe > [] - Using schema from existing table > {"type":"struct","schema-id":0,"fields":[{"id":1,"name":"vin","required":true,"type":"string"},{"id":2,"name":"name","required":true,"type":"string"},{"id":3,"name":"uuid","required":false,"type":"string"},{"id":4,"name":"channel","required":true,"type":"string"},{"id":5,"name":"run_scene","required":true,"type":"string"},{"id":6,"name":"timestamp","required":true,"type":"timestamp"},{"id":7,"name":"rcv_timestamp","required":true,"type":"timestamp"},{"id":8,"name":"raw","required":true,"type":"string"}]} > 10:42:48,079 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient > [] - Trying to connect to metastore with URI thrift://hiveserver:9083 > 10:42:48,079 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient > [] - Opened a connection to metastore, current connections: 3 > 10:42:48,081 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient > [] - Connected to metastore. > 10:42:48,081 INFO org.apache.hadoop.hive.metastore.RetryingMetaStoreClient > [] - RetryingMetaStoreClient proxy=class > org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=root (auth:SIMPLE) > retries=1 delay=1 lifetime=0 > 10:42:48,132 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient > [] - Closed a connection to metastore, current connections: 2 > 10:42:48,308 INFO org.apache.flink.connectors.hive.HiveParallelismInference > [] - Hive source(s3a_flink.icebergTBCloudTrackingTest}) getNumFiles use > time: 171 ms, result: 2 > Exception in thread "main" java.lang.IllegalArgumentException: Table > identifier not set > at > org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:142) > at org.apache.iceberg.mr.Catalogs.loadTable(Catalogs.java:114) > at org.apache.iceberg.mr.Catalogs.loadTable(Catalogs.java:89) > at > org.apache.iceberg.mr.mapreduce.IcebergInputFormat.lambda$getSplits$0(IcebergInputFormat.java:102) > at java.util.Optional.orElseGet(Optional.java:267) > at > org.apache.iceberg.mr.mapreduce.IcebergInputFormat.getSplits(IcebergInputFormat.java:102) > at > org.apache.iceberg.mr.mapred.MapredIcebergInputFormat.getSplits(MapredIcebergInputFormat.java:69) > at > org.apache.iceberg.mr.hive.HiveIcebergInputFormat.getSplits(HiveIcebergInputFormat.java:98) > at > org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:107) > at > org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:71) > at > org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:149) > at > org.apache.flink.connectors.hive.HiveParallelismInference.logRunningTime(HiveParallelismInference.java:107) > at > org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:95) > at > org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:144) > at > org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:114) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan.translateToPlanInternal(CommonExecTableSourceScan.java:106) > at > org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.java:49) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250) > at > org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.delegation.BatchPlanner.$anonfun$translateToPlan$1(BatchPlanner.scala:82) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) {code} > code is : > {code:java} > EnvironmentSettings settings = EnvironmentSettings.newInstance() > .inBatchMode() > .build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > String catalogName = "s3IcebergCatalog"; > String defaultDatabase = "s3a_flink"; > String hiveConfDir = "flink-cloud/src/main/resources"; > HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, > hiveConfDir); > tableEnv.registerCatalog(catalogName, hive); > tableEnv.useCatalog(catalogName); > tableEnv.useDatabase(defaultDatabase); > System.out.println(tableEnv.getCurrentCatalog()); > String tableName = "icebergTBCloudTrackingTest"; > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > String sql = "select uuid from " + tableName; > System.out.println(sql); > tableEnv.executeSql(sql).print(); > {code} > The output of `tableEnv.getCurrentCatalog()` is `s3IcebergCatalog`. But it > reports `10:42:44,866 INFO org.apache.iceberg.BaseMetastoreCatalog [] - > Table loaded by catalog: default_iceberg.s3a_flink.icebergtbcloudtrackingtest > `, and shows `java.lang.IllegalArgumentException: Table identifier not set`. > Does anyone know the reason please? > -- This message was sent by Atlassian Jira (v8.20.10#820010)