[ https://issues.apache.org/jira/browse/FLINK-28276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17560815#comment-17560815 ]
luoyuxia edited comment on FLINK-28276 at 6/30/22 4:25 AM: ----------------------------------------------------------- It's about iceberg sdk. I'm not familar with it. And I'm not sure wheher we can specific the inputfomat using sdk. If not, I think you may need to use Hive SQL in HiveCli or switch to [hive dialect |https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/hive/hive_dialect] in Flink. was (Author: luoyuxia): It's about iceberg sdk. I'm not familar with it. And I'm not sure wheher we can specific the inputfomat using sdk. If not, I think you may need to use Hive SQL in HiveCli or switch to [Hive dialect|[https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/hive/hive_dialect|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/hive/hive_dialect/]] in Flink. > org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate > the hadoop input format > -------------------------------------------------------------------------------------------------- > > Key: FLINK-28276 > URL: https://issues.apache.org/jira/browse/FLINK-28276 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive > Affects Versions: 1.14.2 > Environment: Flink 1.14.2 > Hive 3.1.2 > Scala 2.12 > Iceberg 0.12.1 > Hadoop 3.2.1 > Reporter: wei > Priority: Major > Attachments: BA9CEEA0-BF38-4568-A7AD-66C68B19CF14.png, > image-2022-06-30-09-37-44-705.png, image-2022-06-30-10-36-17-075.png > > > When I read Iceberg tables using Flink HiveCatalog, based on S3A, I got this > error: > > {code:java} > //代码占位符 > Exception in thread "main" > org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate > the hadoop input format > at > org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:100) > at > org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:71) > at > org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:149) > at > org.apache.flink.connectors.hive.HiveParallelismInference.logRunningTime(HiveParallelismInference.java:107) > at > org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:95) > at > org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:144) > at > org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:114) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan.translateToPlanInternal(CommonExecTableSourceScan.java:106) > at > org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.java:49) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250) > at > org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.delegation.BatchPlanner.$anonfun$translateToPlan$1(BatchPlanner.scala:82) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:81) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274) > at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:601) > at loshu.flink.hive.FlinkSQLHiveWriter.main(FlinkSQLHiveWriter.java:69) > Caused by: java.lang.InstantiationException > at > sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at java.lang.Class.newInstance(Class.java:442) > at > org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:98) > ... 30 more > 16:33:36,767 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl > [] - Stopping s3a-file-system metrics system... > 16:33:36,767 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl > [] - s3a-file-system metrics system stopped. > 16:33:36,768 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl > [] - s3a-file-system metrics system shutdown complete.Process finished with > exit code 1 > {code} > My code is: > {code:java} > //代码占位符 > public class FlinkSQLHiveWriter { > private static org.apache.log4j.Logger log = > Logger.getLogger(FlinkSQLHiveWriter.class); > public static void main(String[] args) throws Exception { > System.setProperty("HADOOP_USER_NAME", "root"); > System.setProperty("hadoop.home.dir", "/opt/hadoop-3.2.1/"); > EnvironmentSettings settings = EnvironmentSettings.newInstance() > .inBatchMode() > .build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > String catalogName = "s3IcebergCatalog"; > String defaultDatabase = "s3a_flink"; > String hiveConfDir = "flink-cloud/src/main/resources"; > HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, > hiveConfDir); > tableEnv.registerCatalog(catalogName, hive); > tableEnv.useCatalog(catalogName); > tableEnv.useDatabase(defaultDatabase); > System.out.println(hive.listDatabases()); > System.out.println(hive.listTables(defaultDatabase)); > String tableName = "icebergTBCloudTracking"; > // set sql dialect as default, means using flink sql. > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > String sql = "select vin from " + tableName; > // String sql = "DESC " + tableName; > System.out.println(sql); > Table table = tableEnv.sqlQuery(sql); > table.execute(); > } > } {code} > I can "show tables" or "describe tables", but when using "select * from > table" the error occurs. > -- This message was sent by Atlassian Jira (v8.20.10#820010)