hive写数据测了么,按照你提供的异常信息,显示的是hdfs的问题
-- Best, wldd 在 2020-05-26 17:49:56,"Enzo wang" <sre.enzow...@gmail.com> 写道: >Hi Wldd, > >Hive 是可以访问的,我把之前的2个gist内容贴在这个邮件里,你看一下,里面有hive查询数据的返回结果。 > >还需要什么信息我再提供。 > > > >======== flink insert into hive error ======== > >org.apache.flink.table.api.TableException: Exception in close > at > org.apache.flink.table.filesystem.FileSystemOutputFormat.close(FileSystemOutputFormat.java:131) > at > org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.close(OutputFormatSinkFunction.java:97) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:635) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$afterInvoke$1(StreamTask.java:515) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:513) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.base/java.lang.Thread.run(Thread.java:830) >Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): >File >/user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0 >could only be replicated to 0 nodes instead of minReplication (=1). >There are 1 datanode(s) running and 1 node(s) are excluded in this >operation. > at > org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1628) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3121) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3045) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:493) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213) > > at org.apache.hadoop.ipc.Client.call(Client.java:1476) > at org.apache.hadoop.ipc.Client.call(Client.java:1413) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy22.addBlock(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:567) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy23.addBlock(Unknown Source) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1588) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1373) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:554) > >======== Flink 1.10.0 的lib目录 ======== > >mysql-connector-java-5.1.48.jar >slf4j-log4j12-1.7.15.jar >log4j-1.2.17.jar >flink-table_2.11-1.10.0.jar >flink-table-blink_2.11-1.10.0.jar >flink-dist_2.11-1.10.0.jar >flink-jdbc_2.11-1.10.0.jar >flink-sql-connector-elasticsearch6_2.11-1.10.0.jar >flink-sql-connector-kafka_2.11-1.10.0.jar >flink-json-1.10.0.jar >flink-connector-hive_2.11-1.10.0.jar >flink-shaded-hadoop-2-uber-2.7.5-10.0.jar >hive-exec-2.3.7.jar >flink-csv-1.10.1.jar > > >======== Hive table "pokes" ======== > >❯ docker-compose exec hive-server bash >root@53082ed70ecd:/opt# /opt/hive/bin/beeline -u jdbc:hive2://localhost:10000 >SLF4J: Class path contains multiple SLF4J bindings. >SLF4J: Found binding in >[jar:file:/opt/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] >SLF4J: Found binding in >[jar:file:/opt/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] >SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >explanation. >SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] >Connecting to jdbc:hive2://localhost:10000 >Connected to: Apache Hive (version 2.3.2) >Driver: Hive JDBC (version 2.3.2) >Transaction isolation: TRANSACTION_REPEATABLE_READ >Beeline version 2.3.2 by Apache Hive >0: jdbc:hive2://localhost:10000> describe formatted pokes; >+-------------------------------+----------------------------------------------------+-----------------------+ >| col_name | data_type > | comment | >+-------------------------------+----------------------------------------------------+-----------------------+ >| # col_name | data_type > | comment | >| | NULL > | NULL | >| foo | int > | | >| bar | string > | | >| | NULL > | NULL | >| # Detailed Table Information | NULL > | NULL | >| Database: | default > | NULL | >| Owner: | root > | NULL | >| CreateTime: | Tue May 26 05:42:30 UTC 2020 > | NULL | >| LastAccessTime: | UNKNOWN > | NULL | >| Retention: | 0 > | NULL | >| Location: | >hdfs://namenode:8020/user/hive/warehouse/pokes | NULL > | >| Table Type: | MANAGED_TABLE > | NULL | >| Table Parameters: | NULL > | NULL | >| | numFiles > | 4 | >| | numRows > | 0 | >| | rawDataSize > | 0 | >| | totalSize > | 5839 | >| | transient_lastDdlTime > | 1590480090 | >| | NULL > | NULL | >| # Storage Information | NULL > | NULL | >| SerDe Library: | >org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL > | >| InputFormat: | >org.apache.hadoop.mapred.TextInputFormat | NULL > | >| OutputFormat: | >org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL > | >| Compressed: | No > | NULL | >| Num Buckets: | -1 > | NULL | >| Bucket Columns: | [] > | NULL | >| Sort Columns: | [] > | NULL | >| Storage Desc Params: | NULL > | NULL | >| | serialization.format > | 1 | >+-------------------------------+----------------------------------------------------+-----------------------+ >30 rows selected (0.328 seconds) >0: jdbc:hive2://localhost:10000> > >0: jdbc:hive2://localhost:10000> select * from pokes limit 10; >+------------+------------+ >| pokes.foo | pokes.bar | >+------------+------------+ >| 25 | Tommy | >| 26 | Tommy | >| 27 | Tommy | >| 238 | val_238 | >| 86 | val_86 | >| 311 | val_311 | >| 27 | val_27 | >| 165 | val_165 | >| 409 | val_409 | >| 255 | val_255 | >+------------+------------+ >10 rows selected (0.622 seconds) >0: jdbc:hive2://localhost:10000> > > >======== Hive table "pokes" in Flink ======== > >Flink SQL> describe pokes; >root > |-- foo: INT > |-- bar: STRING > > >======== hadoop/hive 环境 ======== > >version: "3" > >services: > namenode: > image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8 > volumes: > - namenode:/hadoop/dfs/name > environment: > - CLUSTER_NAME=test > env_file: > - ./hadoop-hive.env > ports: > - "50070:50070" > - "8020:8020" > datanode: > image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8 > volumes: > - datanode:/hadoop/dfs/data > env_file: > - ./hadoop-hive.env > environment: > SERVICE_PRECONDITION: "namenode:50070" > ports: > - "50075:50075" > hive-server: > image: bde2020/hive:2.3.2-postgresql-metastore > env_file: > - ./hadoop-hive.env > environment: > HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: >"jdbc:postgresql://hive-metastore/metastore" > SERVICE_PRECONDITION: "hive-metastore:9083" > ports: > - "10000:10000" > hive-metastore: > image: bde2020/hive:2.3.2-postgresql-metastore > env_file: > - ./hadoop-hive.env > command: /opt/hive/bin/hive --service metastore > environment: > SERVICE_PRECONDITION: "namenode:50070 datanode:50075 >hive-metastore-postgresql:5432" > ports: > - "9083:9083" > hive-metastore-postgresql: > image: bde2020/hive-metastore-postgresql:2.3.0 > ports: > - "5432:5432" > presto-coordinator: > image: shawnzhu/prestodb:0.181 > ports: > - "8080:8080" > >volumes: > namenode: > datanode: > > >======== hive-site.xml ======== > ><configuration> > <property> > <name>hive.metastore.uris</name> > <value>thrift://localhost:9083</value> > </property> > <property> > <name>javax.jdo.option.ConnectionURL</name> > > <value>jdbc:postgresql://localhost/metastore?createDatabaseIfNotExist=true</value> > </property> > <property> > <name>javax.jdo.option.ConnectionDriverName</name> > <value>org.postgresql.Driver</value> > </property> > <property> > <name>javax.jdo.option.ConnectionPassword</name> > <value>hive</value> > </property> > <property> > <name>javax.jdo.option.ConnectionUserName</name> > <value>hive</value> > </property> > <property> > <name>hive.metastore.schema.verification</name> > <value>true</value> > </property> ></configuration> > > >======== sql-client-defaults.yaml ======== > >################################################################################ ># Licensed to the Apache Software Foundation (ASF) under one ># or more contributor license agreements. See the NOTICE file ># distributed with this work for additional information ># regarding copyright ownership. The ASF licenses this file ># to you under the Apache License, Version 2.0 (the ># "License"); you may not use this file except in compliance ># with the License. You may obtain a copy of the License at ># ># http://www.apache.org/licenses/LICENSE-2.0 ># ># Unless required by applicable law or agreed to in writing, software ># distributed under the License is distributed on an "AS IS" BASIS, ># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ># See the License for the specific language governing permissions and ># limitations under the License. >################################################################################ > > ># This file defines the default environment for Flink's SQL Client. ># Defaults might be overwritten by a session specific environment. > > ># See the Table API & SQL documentation for details about supported properties. > > >#============================================================================== ># Tables >#============================================================================== > ># Define tables here such as sources, sinks, views, or temporal tables. > >tables: [] # empty list ># A typical table source definition looks like: ># - name: ... ># type: source-table ># connector: ... ># format: ... ># schema: ... > ># A typical view definition looks like: ># - name: ... ># type: view ># query: "SELECT ..." > ># A typical temporal table definition looks like: ># - name: ... ># type: temporal-table ># history-table: ... ># time-attribute: ... ># primary-key: ... > > >#============================================================================== ># User-defined functions >#============================================================================== > ># Define scalar, aggregate, or table functions here. > >functions: [] # empty list ># A typical function definition looks like: ># - name: ... ># from: class ># class: ... ># constructor: ... > > >#============================================================================== ># Catalogs >#============================================================================== > ># Define catalogs here. > >catalogs: > - name: myhive > type: hive > hive-conf-dir: /Users/enzow/code/flink-sql-demo/flink-1.10.0/conf > hive-version: 2.3.2 > > >#============================================================================== ># Modules >#============================================================================== > ># Define modules here. > >#modules: # note the following modules will be of the order they are specified ># - name: core ># type: core > >#============================================================================== ># Execution properties >#============================================================================== > ># Properties that change the fundamental execution behavior of a table program. > >execution: > # select the implementation responsible for planning table programs > # possible values are 'blink' (used by default) or 'old' > planner: blink > # 'batch' or 'streaming' execution > type: streaming > # allow 'event-time' or only 'processing-time' in sources > time-characteristic: event-time > # interval in ms for emitting periodic watermarks > periodic-watermarks-interval: 200 > # 'changelog' or 'table' presentation of results > result-mode: table > # maximum number of maintained rows in 'table' presentation of results > max-table-result-rows: 1000000 > # parallelism of the program > parallelism: 1 > # maximum parallelism > max-parallelism: 128 > # minimum idle state retention in ms > min-idle-state-retention: 0 > # maximum idle state retention in ms > max-idle-state-retention: 0 > # current catalog ('default_catalog' by default) > current-catalog: default_catalog > # current database of the current catalog (default database of the >catalog by default) > current-database: default_database > # controls how table programs are restarted in case of a failures > restart-strategy: > # strategy type > # possible values are "fixed-delay", "failure-rate", "none", or >"fallback" (default) > type: fallback > >#============================================================================== ># Configuration options >#============================================================================== > ># Configuration options for adjusting and tuning table programs. > ># A full list of options and their default values can be found ># on the dedicated "Configuration" web page. > ># A configuration can look like: ># configuration: ># table.exec.spill-compression.enabled: true ># table.exec.spill-compression.block-size: 128kb ># table.optimizer.join-reorder-enabled: true > >#============================================================================== ># Deployment properties >#============================================================================== > ># Properties that describe the cluster to which table programs are submitted >to. > >deployment: > # general cluster communication timeout in ms > response-timeout: 5000 > # (optional) address from cluster to gateway > gateway-address: "" > # (optional) port from cluster to gateway > gateway-port: 0 > > > > > >Cheers, >Enzo > > >On Tue, 26 May 2020 at 17:15, wldd <wldd1...@163.com> wrote: > >> Hi,Enzo wang >> 图片无法加载,github地址也无法访问,你可以试一下hive可以正常读写表么 >> >> >> >> >> >> >> >> >> >> >> >> >> >> -- >> >> Best, >> wldd >> >> >> >> >> 在 2020-05-26 17:01:32,"Enzo wang" <sre.enzow...@gmail.com> 写道: >> >> Hi Wldd, >> >> >> 谢谢回复。 >> >> >> 1. datanode 是可用的 >> >> >> ❯ docker-compose exec namenode hadoop fs -ls /tmp >> Found 1 items >> drwx-wx-wx - root supergroup 0 2020-05-26 05:40 /tmp/hive >> >> >> namenode 的webui 也可以看到: >> >> >> >> >> 2. 设置set execution.type=batch; 以后,执行报错,错误如下 >> Causedby: org.apache.hadoop.ipc.RemoteException(java.io.IOException): >> File >> /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0 >> could only be replicated to 0 nodes instead of minReplication (=1). There >> are 1 datanode(s) running and1 node(s) are excluded inthis operation. >> >> >> 完整错误见: >> https://gist.github.com/r0c/f95ec650fec0a16055787ac0d63f4673 >> >> >> >> On Tue, 26 May 2020 at 16:52, wldd <wldd1...@163.com> wrote: >> >> 问题1: >> >> org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs >> 命令看看那个datanode能不能访问 >> >> >> 问题2: >> 写hive,需要用batch模式,set execution.type=batch; >> >> >> >> >> >> >> >> 在 2020-05-26 16:42:12,"Enzo wang" <sre.enzow...@gmail.com> 写道: >> >> Hi Flink group, >> >> >> 今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。 >> 参考的网址: >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html >> >> >> 版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b >> >> >> 问题1:Flink SQL 读Hive 表pokes 失败 >> >> >> Flink SQL> select * from pokes; >> 2020-05-26 16:12:11,439 INFO org.apache.hadoop.mapred.FileInputFormat >> - Total input paths to process : 4 >> [ERROR] Could not execute SQL statement. Reason: >> org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: >> BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001 >> file=/user/hive/warehouse/pokes/kv1.txt >> >> >> >> >> >> >> >> 问题2:Flink SQL 写Hive 表pokes 失败 >> >> >> Flink SQL> insert into pokes select 12,'tom'; >> [INFO] Submitting SQL update statement to the cluster... >> [ERROR] Could not execute SQL statement. Reason: >> org.apache.flink.table.api.TableException: Stream Tables can only be >> emitted by AppendStreamTableSink, RetractStreamTableSink, or >> UpsertStreamTableSink. >> >> >> >> >> >> >> >> Cheers, >> Enzo