Re: flink-1.12 通过-t指定模式后无法指定yarn参数

2021-01-27 文章 silence
flink1.12后所有的yarn相关的参数通过-D进行指定
例:-D yarn.application.name=xxx 替代以前的-ynm xxx
更多配置参考文档https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#yarn



--
Sent from: http://apache-flink.147419.n8.nabble.com/


关于Flink程序既有流又有SQL,提交任务到yarn之后没有水印生成

2021-01-27 文章 花乞丐
目前我在本地执行Flink程序是可以将kafka中的数据消费到,而且可以成功写入到hive中,但是,当我提交任务到yarn之后,从Flink Web
UI看所有的sink都是 no
watermark的状态,但是去查看hdfs的文件,是成功写入数据的,但是没有提交分区到metastore和提交success文件,所以也就是水印没有作用,但是在本地可以的,怎么在yarn反而不行了!

 
代码如下所示,第一次使用Flink,是我使用的姿势不对吗:
package com.x.flink.app.incr;

import com.alibaba.otter.canal.protocol.FlatMessage;
import com.x.flink.contranst.TopicPattern;
import com.x.flink.executions.TradeOrderExecutions;
import com.x.flink.function.RowsFlatMapFunction;
import com.x.flink.schema.FlatMessageSchema;
import com.x.flink.utils.ConfigUtils;
import com.x.flink.utils.TableResolveUtils;
import com.x.flink.watermark.RowWatermarkAssigner;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;

import java.time.Duration;

/**

flink run \
-m yarn-cluster \
-ys 2 \
-yjm 2g \
-ytm 4g \
-c com.x.flink.app.incr.TradeOrderBinlogResolveApp \
-d \
/opt/tools/flink-1.12.0/x-realtime-etl-1.0-SNAPSHOT.jar

 */
public class TradeOrderBinlogResolveApp {
public static void main(String[] args) {
//获取执行环节
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并发
env.setParallelism(8);
//设置checkpoint
env.enableCheckpointing(6);
// 设置水印生产的时间间隔
env.getConfig().setAutoWatermarkInterval(200);
// 设置Flink SQL环境
EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// 创建table Env
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
tableEnvSettings);
// 设置checkpoint 模型
   
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);
// 设置checkpoint间隔
   
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMinutes(1));
// 指定catalog名称
String catalogName = "devHive";
// 创建HiveCatalog
HiveCatalog hiveCatalog = new HiveCatalog(catalogName,
"default",
ConfigUtils.HIVE_CONF_DIR,
ConfigUtils.HADOOP_CONF_DIR,
ConfigUtils.HIVE_VERSION
);
//注册 Hive Catalog
tableEnv.registerCatalog(catalogName,hiveCatalog);
//使用hive Catalog
tableEnv.useCatalog(catalogName);

//获取表格的schema信息
RowTypeInfo tradeOrderTypes =
TableResolveUtils.getRowTypeinformations("ods.trade_order_incr",tableEnv);
RowTypeInfo tradeOrderItemTypes =
TableResolveUtils.getRowTypeinformations("ods.trade_order_item_incr",tableEnv);
RowTypeInfo tradeRealDeliveryTypes =
TableResolveUtils.getRowTypeinformations("ods.trade_real_delivery_incr",tableEnv);
RowTypeInfo tradeSteelItemTypes =
TableResolveUtils.getRowTypeinformations("ods.trade_steel_item_incr",tableEnv);

//构建kafka消费者,消费非资金业务topic
FlinkKafkaConsumerBase messages = new
FlinkKafkaConsumer<>(TopicPattern.TRADE_PATTERN,
new FlatMessageSchema(),
ConfigUtils.getKafkaConfig())
.setStartFromEarliest();
//给每一条增加水印
FlinkKafkaConsumerBase messagesWaters =
messages.assignTimestampsAndWatermarks(
   
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(
new
SerializableTimestampAssigner() {
@Override
public long extractTimestamp(FlatMessage
element, long recordTimestamp) {
return element.getEs();
}
}
)
);


// 添加数据源
DataStreamSource messageSources =

Re: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-27 文章 林影
实际生产环境下,我们这边业务要求用event time

wpp <1215303...@qq.com> 于2021年1月28日周四 下午2:54写道:

> 可以按照proceeTime来处理吧
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re:over window丢数据

2021-01-27 文章 Appleyuchi
该问题已经解决,请忽略,谢谢














在 2021-01-28 11:42:08,"Appleyuchi"  写道:
>大佬们好!
>
>
>我在学习以下over window案例
>https://help.aliyun.com/document_detail/62514.html
>
>
>我的flink sql client完整操作如下
>https://yuchi.blog.csdn.net/article/details/113128072
>
>
>问题:
>输入8条数据,
>返回7条数据,
>请问为何丢数据?
>
>
>求助,谢谢!


回复: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-27 文章 刘小红
可以调用WatermarkStrategy.withIdleness(Duration idleTimeout) 
指定空闲超时时间,这样不会影响水印的进度,进而影响下游算子操作


| |
刘小红
|
|
18500348...@163.com
|
签名由网易邮箱大师定制
在2021年1月28日 14:42,wpp<1215303...@qq.com> 写道:
可以按照proceeTime来处理吧



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-27 文章 wpp
可以按照proceeTime来处理吧



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于端到端的延迟监控

2021-01-27 文章 wpp
这个延迟,只是给一个参考意义吧,



--
Sent from: http://apache-flink.147419.n8.nabble.com/


关于端到端的延迟监控

2021-01-27 文章 13051111332


Hi Everyone:
  
现在了解到的官方提供的LatencyMarker机制,不建议在生产上使用,而且也不参与算子内部逻辑,只能粗略估算出延迟,所以关于端到端的延迟监控,大家有什么更好的方案吗?





Re: 用application mode部署应用,classloader.resolve-order参数是否必须要改为parent-first?

2021-01-27 文章 lp
谢答。查看我的pom.xml文件,和打包后的压缩包,确实包含kafka(org.apache.kafka.common)的相关依赖;所以我将相关的pom中的依赖都设置为provide,然后重新打包,并确认了我打好的jar包中不包含了任何kafka的依赖,发布运行,这次jobmanager直接报错:Caused
by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
---
我怀疑是否是我的集群部署有问题,我是这样做的:
安装了3节点的hadoop(和yarn)集群【master(NameNode、SecondaryNameNode、ResourceManager),slave01(DataNode、NodeManager),slave02(DataNode、NodeManager)】,在master节点上解压缩了flink-1.12.1.tar.gz包,并且在他的lib目录下放置了hadoop的依赖jar包:flink-shaded-hadoop-2-uber-2.8.3-8.0.jar,然后直接上传我的jar包到该节点的/opt下,在flink目录下采用了如下命令发布到yarn集群以applicationMode运行:bin/flink
run-application -t yarn-application
/opt/quickstart-0.1.jar;发现在slave02上分配了jobmanager的container,里面的jobmanager.log报如上错误。
--
我之前从spark转过来的,spark on yarn 并不需要在每个节点部署,不是flink on yarn
是否也是这样的,如果不多,请教下应该是怎样的?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-27 文章 林影
Hi Everyone, 有个关于watermark问题请教,
watermark是根据event的event time往前推进的,如果上游无数据,导致watermark没有前进,这种情况下应该如何处理呢?


Flink 读 Hive 表,如何设置 TTL

2021-01-27 文章 macia kk
文档上是在 create table 的时候, 设置 lookup.join.cache.ttl

但是我现在想用 streaming kafka 的数据,join 一张已经存在的 Hive 表,怎么设置TTL?

CREATE TABLE dimension_table (
  product_id STRING,
  product_name STRING,
  unit_price DECIMAL(10, 4),
  pv_count BIGINT,
  like_count BIGINT,
  comment_count BIGINT,
  update_time TIMESTAMP(3),
  update_user STRING,
  ...) TBLPROPERTIES (
  'streaming-source.enable' = 'false',   -- option with
default value, can be ignored.
  'streaming-source.partition.include' = 'all',  -- option with
default value, can be ignored.
  'lookup.join.cache.ttl' = '12 h');


咨询一下 LEFT JOIN 产生 DELETE 消息的疑惑

2021-01-27 文章 DONG, Weike
Hi 大家好,

近期在处理 LEFT JOIN 语句时,发现了一个奇怪的现象:假设有如下 SQL 语句:

CREATE TABLE A (
key INT
) WITH (
'connector' = 'kafka',
);

CREATE TABLE B (
  key INT
) WITH (
'connector' = 'kafka',
);

CREATE TABLE Sink (
id INTEGER,
upsert_value BIGINT,
primary key (`id`) NOT ENFORCED
) WITH (
'connector.type' = 'elasticsearch',
'update-mode' = 'upsert',
-- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式
);

INSERT INTO Sink
SELECT A.key, SUM(B.key)
FROM A LEFT JOIN B ON A.key = B.key
GROUP BY A.key;


用于 LEFT JOIN 的左表叫做 A,右表叫做 B,那么:

*场景 1. *如果左表 A 来了一条数据 key=100,在右表 B 中首次没有 JOIN 成功(此时 B 还没有 key=100
的数据),则会向下游 ES Sink 输出 Upsert 消息(true, 100, null)。如果过段时间之后,B 有了 key=100
的数据,此时 Flink 会发出 DELETE 消息(false, 100, null),随后再发送一条 UPSERT 消息(例如 true,
100, 100)更新下游结果。此后无论如何,再也不会输出 DELETE 消息了。

*场景 2. * 如果左表 A 来了一条数据 key=100,在右表 B 中首次 JOIN 成功(即 B 已经有 key=100 的数据)
,则不会输出 DELETE 消息,而是直接输出 Upsert 消息(true, 100, 100),此后无论如何,再也不会输出 DELETE 消息。


*问题:*

请问场景 1 中的 LEFT JOIN 输出 Delete 消息是否有必要呢?我理解直接对于场景 1,直接发出 Upsert 消息也可以,Delete
看似用途不大。而且,Delete 消息会造成对应 doc id 中的一些字段被清除(如果之前该 doc 保存有其他 Flink
表中未定义的字段的话),造成字段的意外丢失。

阅读了 GroupAggFunction 的代码,看到有如下的逻辑,请问这个设计是否可以阐述一下是为了避免什么情况呢?非常感谢 :)

[image: image.png]


over window丢数据

2021-01-27 文章 Appleyuchi
大佬们好!


我在学习以下over window案例
https://help.aliyun.com/document_detail/62514.html


我的flink sql client完整操作如下
https://yuchi.blog.csdn.net/article/details/113128072


问题:
输入8条数据,
返回7条数据,
请问为何丢数据?


求助,谢谢!

Re: sql编译代码时超过64kb导致编译失败

2021-01-27 文章 stgztsw
这个限制不仅仅在janino这边,我们这边已经尝试去除了janino这边的64kb的限制,但是逻辑运行到jdk这边也出现了同样的限制。目前spark是通过spark.conf.set("spark.sql.codegen.wholeStage",false)来避免类似的问题,所以我觉得最好的方法是可以通过配置来限制一些SQL逻辑计划的优化,避免不同的view的逻辑合并到一起,这样就能通过将拆分sql来将逻辑拆分到不同的function中,从而避免一个类函数太大。

Caused by: java.lang.ClassFormatError: Invalid method Code length 123732 in
class file ExpressionReducer$3674
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
 at
org.codehaus.janino.ByteArrayClassLoader.findClass(ByteArrayClassLoader.java:77)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:86)
 at
org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
 at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
 at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
 at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
 at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
 at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
 ... 38 more



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Catalog(Kafka Connectors 的ddl)持久化到hive metastore,groupid一样的问题

2021-01-27 文章 孙啸龙
Hi:
版本:1.12.0
DDL 语句持久化到hive metastore,
创建语句如下,
CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)
那多个应用使用的时候,groupid都是一样的,'properties.group.id' = ‘testGroup’
   这个是不是会有问题,大家是怎么处理的?


我的Flink SQL 任务跑了一天5个小时就挂了,报这个错,是啥原因呢

2021-01-27 文章 nick
我是一个slot





java.util.concurrent.TimeoutException: Invocation of public abstract
java.util.concurrent.CompletableFuture
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.requestTaskBackPressure(org.apache.flink.runtime.executiongraph.ExecutionAttemptID,int,org.apache.flink.api.common.time.Time)
timed out.
at
org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.requestTaskBackPressure(RpcTaskManagerGateway.java:67)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.executiongraph.Execution.requestBackPressure(Execution.java:976)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureRequestCoordinator.requestBackPressure(BackPressureRequestCoordinator.java:156)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureRequestCoordinator.triggerBackPressureRequest(BackPressureRequestCoordinator.java:141)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl.triggerBackPressureRequestInternal(BackPressureStatsTrackerImpl.java:154)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl.getOperatorBackPressureStats(BackPressureStatsTrackerImpl.java:121)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.scheduler.SchedulerBase.requestOperatorBackPressureStats(SchedulerBase.java:837)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.jobmaster.JobMaster.requestOperatorBackPressureStats(JobMaster.java:730)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown Source) ~[?:?]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_271]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_271]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@bd-01:35498/user/rpc/taskmanager_0#-406132916]]
after [15000 ms]. Message of type
[org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical
reason for `AskTimeoutException` is that the recipient actor didn't send a
reply.
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at 

Re:flink-sql-gateway如何使用flink自定义的udf

2021-01-27 文章 Appleyuchi
https://blog.csdn.net/appleyuchi/article/details/112837327








在 2021-01-27 15:25:41,"阿华田"  写道:
>各位大佬,Flink-sql-gateway 提交flink sql任务 ,如何使用flink自定义的udf
>
>
>| |
>阿华田
>|
>|
>a15733178...@163.com
>|
>签名由网易邮箱大师定制
>


Re: Flink SQL csv格式分隔符设置失败

2021-01-27 文章 JasonLee
hi

改成下面这样:

\n => U&'\000A'   

\t => U&'\0009'



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: pyflink1.11 table.to_pandas() 报错 'Sort on a non-time-attribute field is not supported.'

2021-01-27 文章 Xingbo Huang
看报错你是调用了order_by操作吧。对于unbounded
table来说,order_by只能作用在时间字段上,要不然就得后面接上fetch操作。具体可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#orderby-offset--fetch

Best,
Xingbo

肖越 <18242988...@163.com> 于2021年1月27日周三 下午5:44写道:

> 通过sql_query执行返回table对象,执行table.to_pandas()报错:
> Traceback (most recent call last):
>   File
> "C:/projects/dataService-pyflink_explore/dataService-calculate-code-python/src/test/test_mysql_connector.py",
> line 161, in 
> print(table.to_pandas().head(6))
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py",
> line 723, in to_pandas
> .collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
> line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
> line 154, in deco
> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
> pyflink.util.exceptions.TableException: 'Sort on a non-time-attribute
> field is not supported.'
> 请教各位大佬,为什么会转换失败?但表格本身print_schema是没问题的。


Flink-1.11 消费Kafka 写 Hive,消费堆积问题

2021-01-27 文章 nashcen
*flink 消费kafka平均速度 = (8810108 - 8646583)/5  = 32705条/分钟*
每小时大约消费 200w 条数据,速度太慢了,目前已经堆积了800w条数据,这种情况该如何处理?

*21/01/27 20:05:46* INFO clients.Metadata: Cluster ID:
h-faeyjNRhS5xcAUy1JR2Q
Consumer group 'stg_dcpoints_hive' has no active members.

TOPIC PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
CONSUMER-ID HOSTCLIENT-ID
ods_dcpoints_prod 0  6599508940  6608319048  *8810108 *   
-   -   -

*21/01/27 20:10:45* INFO clients.Metadata: Cluster ID:
h-faeyjNRhS5xcAUy1JR2Q
Consumer group 'stg_dcpoints_hive' has no active members.

TOPIC PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
CONSUMER-ID HOSTCLIENT-ID
ods_dcpoints_prod 0  6599672465  6608319048  *8646583 *   
-   -   -



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-sql-gateway如何使用flink自定义的udf

2021-01-27 文章 Sebastian Liu
1. 确保udf jar 在gateway jvm的classpath中
2. 配置:
https://github.com/ververica/flink-sql-gateway/blob/master/conf/sql-gateway-defaults.yaml#L87

阿华田  于2021年1月27日周三 下午3:26写道:

> 各位大佬,Flink-sql-gateway 提交flink sql任务 ,如何使用flink自定义的udf
>
>
> | |
> 阿华田
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>

-- 

*With kind regards

Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com 
QQ: 3239559*


Re: flink-1.12.1 k8s Session集群模式没有生成Stdout输出文件

2021-01-27 文章 Tianwang Li
在 docker-entrypoint.sh 指定了是 “start-foreground” 模式。
所以没有 .out 文件


Tianwang Li  于2021年1月27日周三 下午5:38写道:

> 我参考:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html
>
> 部署了一个session集群,运行任务。
> 在UI查看Stdout是空的,没有生成 .out 文件
>
> 请问是哪里配置的问题吗?
>
> --
> **
>  tivanli
> **
>


-- 
**
 tivanli
**


Re: sql编译代码时超过64kb导致编译失败

2021-01-27 文章 Sebastian Liu
这个问题需要更改CodeGen部分的代码,你可以把上述ExpressionReducer的问题补充到如下issue中,
我可以帮助一起fix.  https://issues.apache.org/jira/browse/FLINK-20898
另外,辛苦也补充一个测试sql示例。

stgztsw  于2021年1月27日周三 下午6:10写道:

> sql编译代码时超过64kb导致编译失败,请问有啥办法绕过这个问题吗?尝试过把sql逻辑拆开成多个view,
> 但是sql逻辑优化的时候还是会合并到一起,无法绕过
>
> Caused by: org.codehaus.janino.InternalCompilerException: Code of method
> "map(Ljava/lang/Object;)Ljava/lang/Object;" of class
> "ExpressionReducer$3674" grows beyond 64 KB
> at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:1009)
> at org.codehaus.janino.CodeContext.write(CodeContext.java:901)
> at
> org.codehaus.janino.UnitCompiler.writeShort(UnitCompiler.java:12195)
> at
> org.codehaus.janino.UnitCompiler.pushConstant(UnitCompiler.java:10660)
> at
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5553)
> at
> org.codehaus.janino.UnitCompiler.access$9300(UnitCompiler.java:215)
> at
>
> org.codehaus.janino.UnitCompiler$16.visitIntegerLiteral(UnitCompiler.java:4423)
> at
>
> org.codehaus.janino.UnitCompiler$16.visitIntegerLiteral(UnitCompiler.java:4394)
> at org.codehaus.janino.Java$IntegerLiteral.accept(Java.java:5442)
> at
> org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> at
> org.codehaus.janino.UnitCompiler.fakeCompile(UnitCompiler.java:3719)
> at
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5569)
> at
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5165)
> at
> org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> at
>
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
> at
>
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> at
> org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> at
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
> at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
> at
> org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
> at
>
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
> at
>
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
> at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
> at
> org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
> at
>
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
> at
>
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
> at
> org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
> at
> org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
> at
> org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$Block.accept(Java.java:2776)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2465)
> at
> org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495)
> at
> org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$IfStatement.accept(Java.java:2947)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
> at
>
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
> at
>
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> at
> org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> at
>
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> at
>
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)

flink sql 处理自定义watermark

2021-01-27 文章 花乞丐
我在使用Flink消费kafka中的消息,并对kafka中的每条消息增加水印,然后将kafka转换成Row,写入Hive,但是在提交PartitionTimeCommit的时候水印一直是长整形的最大负数,导致一直没办法提交分区,在hive中无法查询到数据。但是在hdfs上是有文件的,目前不太清楚是什么情况导致!

FlinkKafkaConsumerBase waterMessages =
messages.assignTimestampsAndWatermarks(
   
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new
SerializableTimestampAssigner() {
@Override
public long extractTimestamp(FlatMessage
element, long recordTimestamp) {
Long es = element.getEs();
return es;
}
})
);

INSERT INTO ods.trade_real_delivery_incr\n" +
"SELECT\n" +
"id,\n" +
"original_id,\n" +
"order_id,\n" +
"order_code,\n" +
"business_type,\n" +
"delivery_id,\n" +
"num,\n" +
"weight,\n" +
"creator_id,\n" +
"creator,\n" +
"admin_id,\n" +
"admin_name,\n" +
"admin_depart_id,\n" +
"admin_depart_code,\n" +
"admin_depart_name,\n" +
"create_time,\n" +
"update_time,\n" +
"es,\n" +
"ts,\n" +
"op,\n" +
"dt\n" +
"FROM trade_real_delivery_tmp




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: flink-sql-gateway稳定性如何,可以在生产环境使用吗?

2021-01-27 文章 zilong xiao
session-client 作用是什么呢? session的维护和管理吗?

felixzh  于2021年1月27日周三 下午5:49写道:

> 如果使用flink-sql-gateway,建议自己参照jdbc封装一个session-client
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-27 14:41:28,"Jeff Zhang"  写道:
> >zeppelin 有 rest api 接口,
> https://www.yuque.com/jeffzhangjianfeng/gldg8w/pz2xoh
> >
> >jinsx  于2021年1月27日周三 下午2:30写道:
> >
> >> 如果使用zeppelin,zeppelin可以提供rpc接口吗
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
> >
> >
> >--
> >Best Regards
> >
> >Jeff Zhang
>


sql编译代码时超过64kb导致编译失败

2021-01-27 文章 stgztsw
sql编译代码时超过64kb导致编译失败,请问有啥办法绕过这个问题吗?尝试过把sql逻辑拆开成多个view,
但是sql逻辑优化的时候还是会合并到一起,无法绕过

Caused by: org.codehaus.janino.InternalCompilerException: Code of method
"map(Ljava/lang/Object;)Ljava/lang/Object;" of class
"ExpressionReducer$3674" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:1009)
at org.codehaus.janino.CodeContext.write(CodeContext.java:901)
at org.codehaus.janino.UnitCompiler.writeShort(UnitCompiler.java:12195)
at 
org.codehaus.janino.UnitCompiler.pushConstant(UnitCompiler.java:10660)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5553)
at org.codehaus.janino.UnitCompiler.access$9300(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$16.visitIntegerLiteral(UnitCompiler.java:4423)
at
org.codehaus.janino.UnitCompiler$16.visitIntegerLiteral(UnitCompiler.java:4394)
at org.codehaus.janino.Java$IntegerLiteral.accept(Java.java:5442)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at org.codehaus.janino.UnitCompiler.fakeCompile(UnitCompiler.java:3719)
at 
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5569)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5165)
at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
at
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at 
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
at
org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
at
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
at org.codehaus.janino.Java$Block.accept(Java.java:2776)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2465)
at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495)
at
org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$IfStatement.accept(Java.java:2947)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
at
org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
... 51 more



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: flink-sql-gateway稳定性如何,可以在生产环境使用吗?

2021-01-27 文章 felixzh
如果使用flink-sql-gateway,建议自己参照jdbc封装一个session-client

















在 2021-01-27 14:41:28,"Jeff Zhang"  写道:
>zeppelin 有 rest api 接口,https://www.yuque.com/jeffzhangjianfeng/gldg8w/pz2xoh
>
>jinsx  于2021年1月27日周三 下午2:30写道:
>
>> 如果使用zeppelin,zeppelin可以提供rpc接口吗
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>
>
>-- 
>Best Regards
>
>Jeff Zhang


pyflink1.11 table.to_pandas() 报错 'Sort on a non-time-attribute field is not supported.'

2021-01-27 文章 肖越
通过sql_query执行返回table对象,执行table.to_pandas()报错:
Traceback (most recent call last):
  File 
"C:/projects/dataService-pyflink_explore/dataService-calculate-code-python/src/test/test_mysql_connector.py",
 line 161, in 
print(table.to_pandas().head(6))
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py",
 line 723, in to_pandas
.collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'Sort on a non-time-attribute field is 
not supported.'
请教各位大佬,为什么会转换失败?但表格本身print_schema是没问题的。

Re: flink-sql-gateway相关问题

2021-01-27 文章 zilong xiao
好的

Lin Li  于2021年1月27日周三 下午5:20写道:

> try:  "execution.target: yarn-pre-job"  ->  "execution.target:
> yarn-per-job"
>
> zilong xiao  于2021年1月27日周三 上午10:17写道:
>
> > 感谢您的回答,我在flink-conf.yaml里指定"execution.target: yarn-pre-job"后,尝试用rest
> > api生成session id时会遇到异常,不清楚是为何,可否帮忙看下
> >
> > flink version: 1.11.3
> > execution.target: yarn-pre-job
> > rest api请求路径和参数:
> > http://localhost:8083/v1/sessions
> > {
> > "planner": "blink",
> > "execution_type": "streaming"
> >}
> >
> > 异常信息:Caused by: java.lang.IllegalStateException: No ClusterClientFactory
> > found. If you were targeting a Yarn cluster, please make sure to export
> the
> > HADOOP_CLASSPATH environment variable or have hadoop in your classpath.
> For
> > more information refer to the "Deployment & Operations" section of the
> > official Apache Flink documentation.
> >
> > Sebastian Liu  于2021年1月27日周三 上午1:01写道:
> >
> > > sql gateway 提交job方式和flink client类似主要取决于flink-conf中的execution.config 配置,
> > > 对per job模式on yarn, 对应的配置是“yarn-per-job”,
> > >
> > >
> >
> 这样加载的PipelineExecutor会控制为:YarnJobClusterExecutorFactory,具备向Yarn提交job的能力,这和使用flink
> > > client
> > > 提交job最终是一个调用路径,余下在flink-conf中添加上yarn相关配置我理解就可以了。
> > > org.apache.flink.yarn.configuration.YarnConfigOptions
> > >
> > > zilong xiao  于2021年1月26日周二 下午4:00写道:
> > >
> > > > 请问有关于flink-sql-gateway rest api以pre job模式提交作业到yarn集群的文档吗?
> > > >
> > >
> > >
> > > --
> > >
> > > *With kind regards
> > > 
> > > Sebastian Liu 刘洋
> > > Institute of Computing Technology, Chinese Academy of Science
> > > Mobile\WeChat: +86—15201613655
> > > E-mail: liuyang0...@gmail.com 
> > > QQ: 3239559*
> > >
> >
>


flink-1.12.1 k8s Session集群模式没有生成Stdout输出文件

2021-01-27 文章 Tianwang Li
我参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html

部署了一个session集群,运行任务。
在UI查看Stdout是空的,没有生成 .out 文件

请问是哪里配置的问题吗?

-- 
**
 tivanli
**


Re: flink-sql-gateway相关问题

2021-01-27 文章 Lin Li
try:  "execution.target: yarn-pre-job"  ->  "execution.target: yarn-per-job"

zilong xiao  于2021年1月27日周三 上午10:17写道:

> 感谢您的回答,我在flink-conf.yaml里指定"execution.target: yarn-pre-job"后,尝试用rest
> api生成session id时会遇到异常,不清楚是为何,可否帮忙看下
>
> flink version: 1.11.3
> execution.target: yarn-pre-job
> rest api请求路径和参数:
> http://localhost:8083/v1/sessions
> {
> "planner": "blink",
> "execution_type": "streaming"
>}
>
> 异常信息:Caused by: java.lang.IllegalStateException: No ClusterClientFactory
> found. If you were targeting a Yarn cluster, please make sure to export the
> HADOOP_CLASSPATH environment variable or have hadoop in your classpath. For
> more information refer to the "Deployment & Operations" section of the
> official Apache Flink documentation.
>
> Sebastian Liu  于2021年1月27日周三 上午1:01写道:
>
> > sql gateway 提交job方式和flink client类似主要取决于flink-conf中的execution.config 配置,
> > 对per job模式on yarn, 对应的配置是“yarn-per-job”,
> >
> >
> 这样加载的PipelineExecutor会控制为:YarnJobClusterExecutorFactory,具备向Yarn提交job的能力,这和使用flink
> > client
> > 提交job最终是一个调用路径,余下在flink-conf中添加上yarn相关配置我理解就可以了。
> > org.apache.flink.yarn.configuration.YarnConfigOptions
> >
> > zilong xiao  于2021年1月26日周二 下午4:00写道:
> >
> > > 请问有关于flink-sql-gateway rest api以pre job模式提交作业到yarn集群的文档吗?
> > >
> >
> >
> > --
> >
> > *With kind regards
> > 
> > Sebastian Liu 刘洋
> > Institute of Computing Technology, Chinese Academy of Science
> > Mobile\WeChat: +86—15201613655
> > E-mail: liuyang0...@gmail.com 
> > QQ: 3239559*
> >
>


Re: Flink sql 1.12写入hive报metastore失败

2021-01-27 文章 Rui Li
你好,

你发的这个stacktrace只是一个warning,是否还有其他的异常呢?

On Wed, Jan 27, 2021 at 10:47 AM gimlee  wrote:

> 使用flink sql 1.12写入hive,未提交到yarn上成功,错误信息如下:
> 2021-01-26 20:44:23.133 [main] INFO
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient  - Trying to connect
> to
> metastore with URI thrift://hdcom02.prd.com:9083
> 2021-01-26 20:44:23.133 [main] INFO
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient  - Trying to connect
> to
> metastore with URI thrift://hdcom02.prd.com:9083
> 2021-01-26 20:44:23.134 [main] INFO
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient  - Opened a connection
> to metastore, current connections: 2
> 2021-01-26 20:44:23.134 [main] INFO
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient  - Opened a connection
> to metastore, current connections: 2
> 2021-01-26 20:44:23.181 [main] WARN
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient  - set_ugi() not
> successful, Likely cause: new client talking to old server. Continuing
> without it.
> org.apache.thrift.transport.TTransportException: null
> at
>
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
> at
> org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
> at
>
> org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:380)
> at
>
> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:230)
> at
> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
> at
>
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_set_ugi(ThriftHiveMetastore.java:4787)
> at
>
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.set_ugi(ThriftHiveMetastore.java:4773)
> at
>
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:534)
> at
>
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:224)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
>
> org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:84)
> at
>
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:95)
> at
>
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:148)
> at
>
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.table.catalog.hive.client.HiveShimV310.getHiveMetastoreClient(HiveShimV310.java:112)
> at
>
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:274)
> at
>
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:80)
> at
>
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:32)
> at
>
> org.apache.flink.connectors.hive.HiveTableSink.consume(HiveTableSink.java:145)
> at
>
> org.apache.flink.connectors.hive.HiveTableSink.lambda$getSinkRuntimeProvider$0(HiveTableSink.java:137)
> at
>
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:109)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at 

文档问题

2021-01-27 文章 xiaolailong


文档问题

2021-01-27 文章 xiaolailong


回复:退订

2021-01-27 文章 Roc Marshal
Hi, Tang.
Please send short message to user-zh-unsubscr...@flink.apache.org if you want 
to unsubscribe the mail.


Best, Roc.


| |
Roc Marshal
|
|
flin...@126.com
|
签名由网易邮箱大师定制


在2021年01月27日 16:41,唐军亮 写道:
退订

退订

2021-01-27 文章 唐军亮
退订