hive写数据测了么,按照你提供的异常信息,显示的是hdfs的问题










--

Best,
wldd





在 2020-05-26 17:49:56,"Enzo wang" <sre.enzow...@gmail.com> 写道:
>Hi Wldd,
>
>Hive 是可以访问的,我把之前的2个gist内容贴在这个邮件里,你看一下,里面有hive查询数据的返回结果。
>
>还需要什么信息我再提供。
>
>
>
>========  flink insert into hive error ========
>
>org.apache.flink.table.api.TableException: Exception in close
>       at 
> org.apache.flink.table.filesystem.FileSystemOutputFormat.close(FileSystemOutputFormat.java:131)
>       at 
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.close(OutputFormatSinkFunction.java:97)
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:635)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$afterInvoke$1(StreamTask.java:515)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:513)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>       at java.base/java.lang.Thread.run(Thread.java:830)
>Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
>File 
>/user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0
>could only be replicated to 0 nodes instead of minReplication (=1).
>There are 1 datanode(s) running and 1 node(s) are excluded in this
>operation.
>       at 
> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1628)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3121)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3045)
>       at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:493)
>       at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>       at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
>       at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213)
>
>       at org.apache.hadoop.ipc.Client.call(Client.java:1476)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1413)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>       at com.sun.proxy.$Proxy22.addBlock(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>Method)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:567)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy23.addBlock(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1588)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1373)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:554)
>
>======== Flink 1.10.0 的lib目录 ========
>
>mysql-connector-java-5.1.48.jar
>slf4j-log4j12-1.7.15.jar
>log4j-1.2.17.jar
>flink-table_2.11-1.10.0.jar
>flink-table-blink_2.11-1.10.0.jar
>flink-dist_2.11-1.10.0.jar
>flink-jdbc_2.11-1.10.0.jar
>flink-sql-connector-elasticsearch6_2.11-1.10.0.jar
>flink-sql-connector-kafka_2.11-1.10.0.jar
>flink-json-1.10.0.jar
>flink-connector-hive_2.11-1.10.0.jar
>flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
>hive-exec-2.3.7.jar
>flink-csv-1.10.1.jar
>
>
>======== Hive table "pokes" ========
>
>❯ docker-compose exec hive-server bash
>root@53082ed70ecd:/opt# /opt/hive/bin/beeline -u jdbc:hive2://localhost:10000
>SLF4J: Class path contains multiple SLF4J bindings.
>SLF4J: Found binding in
>[jar:file:/opt/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>SLF4J: Found binding in
>[jar:file:/opt/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>explanation.
>SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
>Connecting to jdbc:hive2://localhost:10000
>Connected to: Apache Hive (version 2.3.2)
>Driver: Hive JDBC (version 2.3.2)
>Transaction isolation: TRANSACTION_REPEATABLE_READ
>Beeline version 2.3.2 by Apache Hive
>0: jdbc:hive2://localhost:10000> describe formatted pokes;
>+-------------------------------+----------------------------------------------------+-----------------------+
>|           col_name            |                     data_type
>              |        comment        |
>+-------------------------------+----------------------------------------------------+-----------------------+
>| # col_name                    | data_type
>              | comment               |
>|                               | NULL
>              | NULL                  |
>| foo                           | int
>              |                       |
>| bar                           | string
>              |                       |
>|                               | NULL
>              | NULL                  |
>| # Detailed Table Information  | NULL
>              | NULL                  |
>| Database:                     | default
>              | NULL                  |
>| Owner:                        | root
>              | NULL                  |
>| CreateTime:                   | Tue May 26 05:42:30 UTC 2020
>              | NULL                  |
>| LastAccessTime:               | UNKNOWN
>              | NULL                  |
>| Retention:                    | 0
>              | NULL                  |
>| Location:                     |
>hdfs://namenode:8020/user/hive/warehouse/pokes     | NULL
>    |
>| Table Type:                   | MANAGED_TABLE
>              | NULL                  |
>| Table Parameters:             | NULL
>              | NULL                  |
>|                               | numFiles
>              | 4                     |
>|                               | numRows
>              | 0                     |
>|                               | rawDataSize
>              | 0                     |
>|                               | totalSize
>              | 5839                  |
>|                               | transient_lastDdlTime
>              | 1590480090            |
>|                               | NULL
>              | NULL                  |
>| # Storage Information         | NULL
>              | NULL                  |
>| SerDe Library:                |
>org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL
>    |
>| InputFormat:                  |
>org.apache.hadoop.mapred.TextInputFormat           | NULL
>    |
>| OutputFormat:                 |
>org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL
>            |
>| Compressed:                   | No
>              | NULL                  |
>| Num Buckets:                  | -1
>              | NULL                  |
>| Bucket Columns:               | []
>              | NULL                  |
>| Sort Columns:                 | []
>              | NULL                  |
>| Storage Desc Params:          | NULL
>              | NULL                  |
>|                               | serialization.format
>              | 1                     |
>+-------------------------------+----------------------------------------------------+-----------------------+
>30 rows selected (0.328 seconds)
>0: jdbc:hive2://localhost:10000>
>
>0: jdbc:hive2://localhost:10000> select * from pokes limit 10;
>+------------+------------+
>| pokes.foo  | pokes.bar  |
>+------------+------------+
>| 25         | Tommy      |
>| 26         | Tommy      |
>| 27         | Tommy      |
>| 238        | val_238    |
>| 86         | val_86     |
>| 311        | val_311    |
>| 27         | val_27     |
>| 165        | val_165    |
>| 409        | val_409    |
>| 255        | val_255    |
>+------------+------------+
>10 rows selected (0.622 seconds)
>0: jdbc:hive2://localhost:10000>
>
>
>======== Hive table "pokes" in Flink ========
>
>Flink SQL> describe pokes;
>root
> |-- foo: INT
> |-- bar: STRING
>
>
>======== hadoop/hive 环境 ========
>
>version: "3"
>
>services:
>  namenode:
>    image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
>    volumes:
>      - namenode:/hadoop/dfs/name
>    environment:
>      - CLUSTER_NAME=test
>    env_file:
>      - ./hadoop-hive.env
>    ports:
>      - "50070:50070"
>      - "8020:8020"
>  datanode:
>    image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
>    volumes:
>      - datanode:/hadoop/dfs/data
>    env_file:
>      - ./hadoop-hive.env
>    environment:
>      SERVICE_PRECONDITION: "namenode:50070"
>    ports:
>      - "50075:50075"
>  hive-server:
>    image: bde2020/hive:2.3.2-postgresql-metastore
>    env_file:
>      - ./hadoop-hive.env
>    environment:
>      HIVE_CORE_CONF_javax_jdo_option_ConnectionURL:
>"jdbc:postgresql://hive-metastore/metastore"
>      SERVICE_PRECONDITION: "hive-metastore:9083"
>    ports:
>      - "10000:10000"
>  hive-metastore:
>    image: bde2020/hive:2.3.2-postgresql-metastore
>    env_file:
>      - ./hadoop-hive.env
>    command: /opt/hive/bin/hive --service metastore
>    environment:
>      SERVICE_PRECONDITION: "namenode:50070 datanode:50075
>hive-metastore-postgresql:5432"
>    ports:
>      - "9083:9083"
>  hive-metastore-postgresql:
>    image: bde2020/hive-metastore-postgresql:2.3.0
>    ports:
>      - "5432:5432"
>  presto-coordinator:
>    image: shawnzhu/prestodb:0.181
>    ports:
>      - "8080:8080"
>
>volumes:
>  namenode:
>  datanode:
>
>
>======== hive-site.xml ========
>
><configuration>
>    <property>
>        <name>hive.metastore.uris</name>
>        <value>thrift://localhost:9083</value>
>    </property>
>    <property>
>        <name>javax.jdo.option.ConnectionURL</name>
>        
> <value>jdbc:postgresql://localhost/metastore?createDatabaseIfNotExist=true</value>
>    </property>
>    <property>
>        <name>javax.jdo.option.ConnectionDriverName</name>
>        <value>org.postgresql.Driver</value>
>    </property>
>    <property>
>        <name>javax.jdo.option.ConnectionPassword</name>
>        <value>hive</value>
>    </property>
>    <property>
>        <name>javax.jdo.option.ConnectionUserName</name>
>        <value>hive</value>
>    </property>
>   <property>
>       <name>hive.metastore.schema.verification</name>
>       <value>true</value>
>   </property>
></configuration>
>
>
>======== sql-client-defaults.yaml ========
>
>################################################################################
>#  Licensed to the Apache Software Foundation (ASF) under one
>#  or more contributor license agreements.  See the NOTICE file
>#  distributed with this work for additional information
>#  regarding copyright ownership.  The ASF licenses this file
>#  to you under the Apache License, Version 2.0 (the
>#  "License"); you may not use this file except in compliance
>#  with the License.  You may obtain a copy of the License at
>#
>#      http://www.apache.org/licenses/LICENSE-2.0
>#
>#  Unless required by applicable law or agreed to in writing, software
>#  distributed under the License is distributed on an "AS IS" BASIS,
>#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>#  See the License for the specific language governing permissions and
># limitations under the License.
>################################################################################
>
>
># This file defines the default environment for Flink's SQL Client.
># Defaults might be overwritten by a session specific environment.
>
>
># See the Table API & SQL documentation for details about supported properties.
>
>
>#==============================================================================
># Tables
>#==============================================================================
>
># Define tables here such as sources, sinks, views, or temporal tables.
>
>tables: [] # empty list
># A typical table source definition looks like:
># - name: ...
>#   type: source-table
>#   connector: ...
>#   format: ...
>#   schema: ...
>
># A typical view definition looks like:
># - name: ...
>#   type: view
>#   query: "SELECT ..."
>
># A typical temporal table definition looks like:
># - name: ...
>#   type: temporal-table
>#   history-table: ...
>#   time-attribute: ...
>#   primary-key: ...
>
>
>#==============================================================================
># User-defined functions
>#==============================================================================
>
># Define scalar, aggregate, or table functions here.
>
>functions: [] # empty list
># A typical function definition looks like:
># - name: ...
>#   from: class
>#   class: ...
>#   constructor: ...
>
>
>#==============================================================================
># Catalogs
>#==============================================================================
>
># Define catalogs here.
>
>catalogs:
>  - name: myhive
>    type: hive
>    hive-conf-dir: /Users/enzow/code/flink-sql-demo/flink-1.10.0/conf
>    hive-version: 2.3.2
>
>
>#==============================================================================
># Modules
>#==============================================================================
>
># Define modules here.
>
>#modules: # note the following modules will be of the order they are specified
>#  - name: core
>#    type: core
>
>#==============================================================================
># Execution properties
>#==============================================================================
>
># Properties that change the fundamental execution behavior of a table program.
>
>execution:
>  # select the implementation responsible for planning table programs
>  # possible values are 'blink' (used by default) or 'old'
>  planner: blink
>  # 'batch' or 'streaming' execution
>  type: streaming
>  # allow 'event-time' or only 'processing-time' in sources
>  time-characteristic: event-time
>  # interval in ms for emitting periodic watermarks
>  periodic-watermarks-interval: 200
>  # 'changelog' or 'table' presentation of results
>  result-mode: table
>  # maximum number of maintained rows in 'table' presentation of results
>  max-table-result-rows: 1000000
>  # parallelism of the program
>  parallelism: 1
>  # maximum parallelism
>  max-parallelism: 128
>  # minimum idle state retention in ms
>  min-idle-state-retention: 0
>  # maximum idle state retention in ms
>  max-idle-state-retention: 0
>  # current catalog ('default_catalog' by default)
>  current-catalog: default_catalog
>  # current database of the current catalog (default database of the
>catalog by default)
>  current-database: default_database
>  # controls how table programs are restarted in case of a failures
>  restart-strategy:
>    # strategy type
>    # possible values are "fixed-delay", "failure-rate", "none", or
>"fallback" (default)
>    type: fallback
>
>#==============================================================================
># Configuration options
>#==============================================================================
>
># Configuration options for adjusting and tuning table programs.
>
># A full list of options and their default values can be found
># on the dedicated "Configuration" web page.
>
># A configuration can look like:
># configuration:
>#   table.exec.spill-compression.enabled: true
>#   table.exec.spill-compression.block-size: 128kb
>#   table.optimizer.join-reorder-enabled: true
>
>#==============================================================================
># Deployment properties
>#==============================================================================
>
># Properties that describe the cluster to which table programs are submitted 
>to.
>
>deployment:
>  # general cluster communication timeout in ms
>  response-timeout: 5000
>  # (optional) address from cluster to gateway
>  gateway-address: ""
>  # (optional) port from cluster to gateway
>  gateway-port: 0
>
>
>
>
>
>Cheers,
>Enzo
>
>
>On Tue, 26 May 2020 at 17:15, wldd <wldd1...@163.com> wrote:
>
>> Hi,Enzo wang
>> 图片无法加载,github地址也无法访问,你可以试一下hive可以正常读写表么
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Best,
>> wldd
>>
>>
>>
>>
>> 在 2020-05-26 17:01:32,"Enzo wang" <sre.enzow...@gmail.com> 写道:
>>
>> Hi Wldd,
>>
>>
>> 谢谢回复。
>>
>>
>> 1.  datanode 是可用的
>>
>>
>> ❯ docker-compose exec namenode hadoop fs -ls /tmp
>> Found 1 items
>> drwx-wx-wx   - root supergroup          0 2020-05-26 05:40 /tmp/hive
>>
>>
>> namenode 的webui 也可以看到:
>>
>>
>>
>>
>> 2.  设置set execution.type=batch; 以后,执行报错,错误如下
>> Causedby: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
>> File
>> /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0
>> could only be replicated to 0 nodes instead of minReplication (=1). There
>> are 1 datanode(s) running and1 node(s) are excluded inthis operation.
>>
>>
>> 完整错误见:
>> https://gist.github.com/r0c/f95ec650fec0a16055787ac0d63f4673
>>
>>
>>
>> On Tue, 26 May 2020 at 16:52, wldd <wldd1...@163.com> wrote:
>>
>> 问题1:
>>
>> org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs
>> 命令看看那个datanode能不能访问
>>
>>
>> 问题2:
>> 写hive,需要用batch模式,set execution.type=batch;
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-05-26 16:42:12,"Enzo wang" <sre.enzow...@gmail.com> 写道:
>>
>> Hi Flink group,
>>
>>
>> 今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。
>> 参考的网址:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html
>>
>>
>> 版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b
>>
>>
>> 问题1:Flink SQL 读Hive 表pokes 失败
>>
>>
>> Flink SQL> select * from pokes;
>> 2020-05-26 16:12:11,439 INFO  org.apache.hadoop.mapred.FileInputFormat
>>                   - Total input paths to process : 4
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block:
>> BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001
>> file=/user/hive/warehouse/pokes/kv1.txt
>>
>>
>>
>>
>>
>>
>>
>> 问题2:Flink SQL 写Hive 表pokes 失败
>>
>>
>> Flink SQL> insert into pokes select 12,'tom';
>> [INFO] Submitting SQL update statement to the cluster...
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.flink.table.api.TableException: Stream Tables can only be
>> emitted by AppendStreamTableSink, RetractStreamTableSink, or
>> UpsertStreamTableSink.
>>
>>
>>
>>
>>
>>
>>
>> Cheers,
>> Enzo

回复