[ 
https://issues.apache.org/jira/browse/FLINK-28276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17560815#comment-17560815
 ] 

luoyuxia edited comment on FLINK-28276 at 6/30/22 4:24 AM:
-----------------------------------------------------------

It's about iceberg sdk. I'm not familar with it. And I'm not sure wheher we can 
specific the inputfomat using sdk.

If not, I think you may need to use Hive SQL in HiveCli or switch to [Hive 
dialect|[https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/hive/hive_dialect/]]
 in Flink.


was (Author: luoyuxia):
It's about iceberg sdk. I'm not familar with it. And I'm not sure wheher we can 
specific the inputfomat using sdk.

If not, I think you may need to use Hive SQL in HiveCli or switch to Hive 
dialect in Flink.

> org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate 
> the hadoop input format
> --------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-28276
>                 URL: https://issues.apache.org/jira/browse/FLINK-28276
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>    Affects Versions: 1.14.2
>         Environment: Flink 1.14.2
> Hive 3.1.2
> Scala 2.12
> Iceberg 0.12.1
> Hadoop 3.2.1
>            Reporter: wei
>            Priority: Major
>         Attachments: BA9CEEA0-BF38-4568-A7AD-66C68B19CF14.png, 
> image-2022-06-30-09-37-44-705.png, image-2022-06-30-10-36-17-075.png
>
>
> When I read Iceberg tables using Flink HiveCatalog, based on S3A,  I got this 
> error:
>  
> {code:java}
> //代码占位符
> Exception in thread "main" 
> org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate 
> the hadoop input format
>     at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:100)
>     at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:71)
>     at 
> org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:149)
>     at 
> org.apache.flink.connectors.hive.HiveParallelismInference.logRunningTime(HiveParallelismInference.java:107)
>     at 
> org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:95)
>     at 
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:144)
>     at 
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:114)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan.translateToPlanInternal(CommonExecTableSourceScan.java:106)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.java:49)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.delegation.BatchPlanner.$anonfun$translateToPlan$1(BatchPlanner.scala:82)
>     at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>     at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:81)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274)
>     at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:601)
>     at loshu.flink.hive.FlinkSQLHiveWriter.main(FlinkSQLHiveWriter.java:69)
> Caused by: java.lang.InstantiationException
>     at 
> sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>     at java.lang.Class.newInstance(Class.java:442)
>     at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:98)
>     ... 30 more
> 16:33:36,767 INFO  org.apache.hadoop.metrics2.impl.MetricsSystemImpl          
>   [] - Stopping s3a-file-system metrics system...
> 16:33:36,767 INFO  org.apache.hadoop.metrics2.impl.MetricsSystemImpl          
>   [] - s3a-file-system metrics system stopped.
> 16:33:36,768 INFO  org.apache.hadoop.metrics2.impl.MetricsSystemImpl          
>   [] - s3a-file-system metrics system shutdown complete.Process finished with 
> exit code 1
>  {code}
> My code is:
> {code:java}
> //代码占位符
> public class FlinkSQLHiveWriter {
>     private static org.apache.log4j.Logger log = 
> Logger.getLogger(FlinkSQLHiveWriter.class);
>     public static void main(String[] args) throws Exception {
>         System.setProperty("HADOOP_USER_NAME", "root");
>         System.setProperty("hadoop.home.dir", "/opt/hadoop-3.2.1/");
>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>                 .inBatchMode()
>                 .build();
>         TableEnvironment tableEnv = TableEnvironment.create(settings);
>         String catalogName = "s3IcebergCatalog";
>         String defaultDatabase = "s3a_flink";
>         String hiveConfDir = "flink-cloud/src/main/resources"; 
>         HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, 
> hiveConfDir);
>         tableEnv.registerCatalog(catalogName, hive);
>         tableEnv.useCatalog(catalogName);
>         tableEnv.useDatabase(defaultDatabase);
>         System.out.println(hive.listDatabases());
>         System.out.println(hive.listTables(defaultDatabase));
>         String tableName = "icebergTBCloudTracking";
>         // set sql dialect as default, means using flink sql.
>         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>         String sql = "select vin from " + tableName;
> //        String sql = "DESC " + tableName;
>         System.out.println(sql);
>         Table table = tableEnv.sqlQuery(sql);
>         table.execute();
>     }
> } {code}
> I can "show tables" or "describe tables", but when using "select * from 
> table" the error occurs.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to