如何实现event triggered window?

2021-11-21 文章 Pinjie Huang
Hi friends,

Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。

比如说 想知道过去1小时event A trigger的次数,

如果使用tumbling window和1h window
|1h | 1h |
t=0
在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。

使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。

如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window?


Re:回复:flink1.13.1 sql client connect hivecatalog 报错

2021-11-21 文章 RS
图片看不到的,尽量不要发图片,你可以复制文字出来并说明下,
















在 2021-11-22 13:14:13,"zhiyuan su"  写道:

我使用的是上面的jar 包。从1.13的文档处获取的,但维标注flink 版本,我理解应该是flink1.13版本编译的。



这个是yaml文件,我直接在sql 客户端,通过DDL 的方式去编写的话,也是如下报错:
Caused by: java.util.ServiceConfigurationError: 
org.apache.flink.table.factories.Factory: 
org.apache.flink.table.module.hive.HiveModuleFactory not a subtype


回复:flink1.13.1 sql client connect hivecatalog 报错

2021-11-21 文章 zhiyuan su
[image: image.png]
我使用的是上面的jar 包。从1.13的文档处获取的,但维标注flink 版本,我理解应该是flink1.13版本编译的。
[image: image.png]
这个是yaml文件,我直接在sql 客户端,通过DDL 的方式去编写的话,也是如下报错:

> Caused by: java.util.ServiceConfigurationError:
> org.apache.flink.table.factories.Factory:
> org.apache.flink.table.module.hive.HiveModuleFactory not a subtype
>


Re:FlinkSQL ES7连接器无法使用

2021-11-21 文章 mispower
你好,咨询一下后续你这个问题是如何解决的?











At 2021-06-10 10:15:58, "mokaful" <649713...@qq.com> wrote:
>org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
>instantiate user function.
>   at
>org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:338)
>~[flink-dist_2.11-1.13.1.jar:1.13.1]
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:653)
>~[flink-dist_2.11-1.13.1.jar:1.13.1]
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:626)
>~[flink-dist_2.11-1.13.1.jar:1.13.1]
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
>~[flink-dist_2.11-1.13.1.jar:1.13.1]
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
>~[flink-dist_2.11-1.13.1.jar:1.13.1]
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
>~[flink-dist_2.11-1.13.1.jar:1.13.1]
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
>~[flink-dist_2.11-1.13.1.jar:1.13.1]
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
>~[flink-dist_2.11-1.13.1.jar:1.13.1]
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
>~[flink-dist_2.11-1.13.1.jar:1.13.1]
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
>~[flink-dist_2.11-1.13.1.jar:1.13.1]
>   at
>org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:181)
>~[flink-dist_2.11-1.13.1.jar:1.13.1]
>   at
>org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548)
>~[flink-dist_2.11-1.13.1.jar:1.13.1]
>   at
>org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>~[flink-dist_2.11-1.13.1.jar:1.13.1]
>   at
>org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
>~[flink-dist_2.11-1.13.1.jar:1.13.1]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
>~[flink-dist_2.11-1.13.1.jar:1.13.1]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>~[flink-dist_2.11-1.13.1.jar:1.13.1]
>   at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
>Caused by: java.io.InvalidClassException:
>org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSink$AuthRestClientFactory;
>local class incompatible: stream classdesc serialVersionUID =
>-2564582543942331131, local class serialVersionUID = -2353232579685349916
>   at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>~[?:1.8.0_181]
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>~[?:1.8.0_181]
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>~[?:1.8.0_181]
>   at
>java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
>~[?:1.8.0_181]
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>~[?:1.8.0_181]
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>~[?:1.8.0_181]
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>~[?:1.8.0_181]
>   at
>java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>~[?:1.8.0_181]
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>~[?:1.8.0_181]
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>~[?:1.8.0_181]
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>~[?:1.8.0_181]
>   at
>java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>~[?:1.8.0_181]
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>~[?:1.8.0_181]
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>~[?:1.8.0_181]
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>~[?:1.8.0_181]
>   at
>java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>~[?:1.8.0_181]
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>~[?:1.8.0_181]
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>~[?:1.8.0_181]
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>~[?:1.8.0_181]
>   at
>java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>~[?:1.8.0_181]
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>~[?:1.8.0_181]
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>~[?:1.8.0_181]
>   at
>org.apache.flink.util.

jdk11创建hive catalog抛错

2021-11-21 文章 aiden
求助,jdk从8升级到11后使用hive作为flink 
table的catalog抛错,排查是bsTableEnv.registerCatalog(catalogName, catalog) 抛错,具体异常为:
11:55:22.343 [main] ERROR hive.log - Got exception: 
java.lang.ClassCastException class [Ljava.lang.Object; cannot be cast to class 
[Ljava.net.URI; ([Ljava.lang.Object; and [Ljava.net.URI; are in module 
java.base of loader 'bootstrap')
java.lang.ClassCastException: class [Ljava.lang.Object; cannot be cast to class 
[Ljava.net.URI; ([Ljava.lang.Object; and [Ljava.net.URI; are in module 
java.base of loader 'bootstrap')
at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:274)
 [hive-exec-2.1.1.jar:2.1.1]
at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:210)
 [hive-exec-2.1.1.jar:2.1.1]
at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method) ~[?:?]
at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 [?:?]
at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 [?:?]
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) 
[?:?]
at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1652)
 [hive-exec-2.1.1.jar:2.1.1]
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:80)
 [hive-exec-2.1.1.jar:2.1.1]
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:130)
 [hive-exec-2.1.1.jar:2.1.1]
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:115)
 [hive-exec-2.1.1.jar:2.1.1]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) ~[?:?]
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 ~[?:?]
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at 
org.apache.flink.table.catalog.hive.client.HiveShimV200.getHiveMetastoreClient(HiveShimV200.java:54)
 [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:277)
 [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:78)
 [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:68)
 [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:32)
 [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:296) 
[flink-connector-hive_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:195)
 [flink-table-api-java-1.14.0.jar:1.14.0]
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:373)
 [flink-table-api-java-1.14.0.jar:1.14.0]
at catalogTest.FlinkExecTableRun.flinkMain(FlinkExecTableRun.java:27) 
[classes/:?]
at catalogTest.test.main(test.java:11) [classes/:?]
11:55:22.348 [main] ERROR hive.log - Converting exception to MetaException
Exception in thread "main" 
org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create 
Hive Metastore client
at 
org.apache.flink.table.catalog.hive.client.HiveShimV200.getHiveMetastoreClient(HiveShimV200.java:61)
at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:277)
at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:78)
at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:68)
at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:32)
at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:296)
at 
org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:195)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:373)
at catalogTest.FlinkExecTableRun.flinkMain(FlinkExecTableRun.java:27)
at catalogTest.test.main(test.java:11)
Caused by: java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.

flink1.13.1 sql client connect hivecatalog 报错

2021-11-21 文章 zhiyuan su
Hello, friends !
   我按照官方文档使用 sql client 去连接hive catalog 时出错。
我的hive version 2.3.6
  Flink version 1.13.1

感觉官方介绍的bundled 方式添加jar 包,在flink/lib 下添加如下截图的包。然后重启集群,启动了sql-client
,连接报错如下,看报错感觉缺包,不知道缺什么包。第二种方式也尝试了下,一样的报错。似乎无法创建catalog
[image: image.png]
Yaml 文件:
[image: image.png]

Reading session environment from:
file:/Users/feng/flink-1.13.1/bin/../catlog_yaml/hiveCatalog.yaml


Exception in thread "main"
org.apache.flink.table.client.SqlClientException: Unexpected exception.
This is a bug. Please consider filing an issue.
   at
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
   at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create
catalog 'myhive'.

Catalog options are:
'hive-conf-dir'='/Users/feng/hive-2.3.6/conf'
'type'='hive'
   at
org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:270)
   at
org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.createCatalog(LegacyTableEnvironmentInitializer.java:217)
   at
org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.lambda$initializeCatalogs$1(LegacyTableEnvironmentInitializer.java:120)
   at java.base/java.util.HashMap.forEach(HashMap.java:1336)
   at
org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeCatalogs(LegacyTableEnvironmentInitializer.java:117)
   at
org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeSessionState(LegacyTableEnvironmentInitializer.java:105)
   at
org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:233)
   at
org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:100)
   at
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:91)
   at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
   at
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
   ... 1 more
Caused by: org.apache.flink.table.api.TableException: Could not load
service provider for factories.
   at
org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:507)
   at
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:298)
   at
org.apache.flink.table.factories.FactoryUtil.getCatalogFactory(FactoryUtil.java:455)
   at
org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:251)
   ... 11 more
Caused by: java.util.ServiceConfigurationError:
org.apache.flink.table.factories.Factory:
org.apache.flink.table.module.hive.HiveModuleFactory not a subtype
   at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:589)
   at
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1237)
   at
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1265)
   at
java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1300)
   at
java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1385)
   at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
   at
org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:503)
   ... 14 more


Re: Undeliverable: Re: flink1.13.1 sql client connect hivecatalog 报错

2021-11-21 文章 Shengkai Fang
另外你的图挂了,建议图片发图床或者直接发文字。

Best,
Shengkai

 于2021年11月22日周一 上午10:15写道:

> *Delivery has failed to these recipients or groups:*
>
> m...@zhangzuofeng.cn
> Your message couldn't be delivered because an email server outside Office
> 365 returned an error that indicates that the recipient's domain (the text
> after the "@" symbol in the recipient's email address) doesn't exist. To
> fix this contact the recipient by some other means (by phone, for example)
> and ask them to tell their email admin that when trying to send them a
> message you got an error that their domain doesn't exist. Only the
> recipient's email admin can fix this problem.
>
> *For Email Admins*
> When Office 365 tried to send the message to the external email server,
> the external server returned the error below. This error indicates that the
> recipient's email domain can't be found in the Internet's Domain Name
> Service (DNS) directory. In order to know where to send the message, the
> recipient's domain must be listed in the DNS directory. Only the
> recipient's email admin can fix this. To fix the problem, contact the
> recipient's email admin and give them the error and the name of the
> external email server that reported the error. Ask them to work with their
> domain registrar to make sure their domain is properly set up and their DNS
> records are properly replicated across the Internet.
>
> Only the recipient's email admin can fix this. Unfortunately, Office 365
> support won't be able to help with these kinds of externally reported
> errors.
>
>
>
>
>
>
>
>
> *Diagnostic information for administrators:*
>
> Generating server: OSZP286MB2128.JPNP286.PROD.OUTLOOK.COM
>
> m...@zhangzuofeng.cn
> Remote Server returned '550 5.4.355 Remote server returned recipient
> domain does not exist -> 550 Domain may not exist or DNS check failed
> [MC0vH/wB8fQq02WnUDcXqaYcOrWa0lE0Nir12/bLJUchond6CFg4zT5AJhwU1FezTg== IP:
> 2a01:111:f403:700c::201].'
>
> Original message headers:
>
> Received: from OSZP286MB1583.JPNP286.PROD.OUTLOOK.COM (2603:1096:604:1b7::13)
>  by OSZP286MB2128.JPNP286.PROD.OUTLOOK.COM (2603:1096:604:189::6) with
>  Microsoft SMTP Server (version=TLS1_2,
>  cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.4713.21; Mon, 22 Nov
>  2021 02:15:07 +
> Resent-From: 
> Received: from OSZP286MB1583.JPNP286.PROD.OUTLOOK.COM ([::1]) by
>  OSZP286MB1583.JPNP286.PROD.OUTLOOK.COM ([fe80::24ea:b6b7:6b2b:d08e%6]) with
>  Microsoft SMTP Server id 15.20.4713.025; Mon, 22 Nov 2021 02:15:07 +
> Authentication-Results: spf=pass (sender IP is 3.227.148.255)
>  smtp.mailfrom=flink.apache.org; dkim=fail (body hash did not verify)
>  header.d=gmail.com;dmarc=fail action=none header.from=gmail.com;
> Received-SPF: Pass (protection.outlook.com: domain of flink.apache.org
>  designates 3.227.148.255 as permitted sender)
>  receiver=protection.outlook.com; client-ip=3.227.148.255;
>  helo=mxout1-ec2-va.apache.org;
> X-IncomingTopHeaderMarker: 
> OriginalChecksum:93F509B82040037CD36D162A94D5366E7DC71943C0DE2038B9586ABCB36588EA;UpperCasedChecksum:FB92A1A7C56D111D427693209F2E420646A52B027027D8DFEF0B201D4F946685;SizeAsReceived:4720;Count:37
> Mailing-List: contact user-zh-h...@flink.apache.org; run by ezmlm
> Precedence: bulk
> List-Help: 
> List-Unsubscribe: 
> List-Post: 
> List-Id: 
> Reply-To: user-zh@flink.apache.org
> Delivered-To: mailing list user-zh@flink.apache.org
> X-Virus-Scanned: Debian amavisd-new at spamproc1-he-de.apache.org
> X-Spam-Flag: NO
> X-Spam-Score: 1.009
> X-Spam-Level: *
> X-Spam-Status: No, score=1.009 tagged_above=-999 required=6.31
>   tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1,
>   DKIM_VALID_EF=-0.1, FREEMAIL_REPLY=1, HTML_MESSAGE=0.2,
>   RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001, T_REMOTE_IMAGE=0.01,
>   URIBL_BLOCKED=0.001] autolearn=disabled
> Authentication-Results-Original: spamproc1-he-de.apache.org (amavisd-new);
>   dkim=pass (2048-bit key) header.d=gmail.com
> Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.166.49; 
> helo=mail-io1-f49.google.com; envelope-from=fskm...@gmail.com; 
> receiver=
> DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
> d=gmail.com; s=20210112;
> h=mime-version:references:in-reply-to:from:date:message-id:subject:to;
> bh=SvZpnOcAgXJf/7etLtqEdq3QYh0c3FeCn5ufN1Zxgwk=;
> b=floOsLLjo0MWU1kXqwl97yvqj/RcS0xXY3y8biZ8zy8Wa+MMHYMZ70ClddQM0sHcNA
>  fGzS1xBE/u6hTCx2cZaa/9tzKqvwPuFBSphqhRFPSXgqqcpQe9AvZI8WboE/9izOAfjH
>  tAcgZ0ZQXThjbkY+yYIslwbzBninWf98vCCIQvq8tJ+2hVJ+6UGAqBaS5TPDffqP/uxt
>  myxxaVV8Q39OOP0ezk/6fYmaINQLfD6KbNvDSGjp4Xfs7FoMeJJxKJY2Zmsr5Lznv7hD
>  J4wa1ieAdDGpkdBp9UwOEMdDfbRKnk+K/wX9OvDO1eYkU0Qc9R1tAyYvEPuYXC4HTvQE
>  JY0Q==
> X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
> d=1e100.net; s=20210112;
> h

Re: flink1.13.1 sql client connect hivecatalog 报错

2021-11-21 文章 Shengkai Fang
你好,想问一下你的 hive 包确定是1.13对应的版本吗?

Best,
Shengkai

drewfranklin  于2021年11月22日周一 上午9:45写道:

> Hello
>我按照官方文档使用 sql client 去连接hive catalog 时出错。
> 我的hive version 2.3.6
>   Flink version 1.13.1
>
> 感觉官方介绍的bundled 方式添加jar 包,在flink/lib 下添加如下截图的包。然后重启集群,启动了sql-client
> ,连接报错如下,看报错感觉缺包,不知道缺什么包。第二种方式也尝试了下,一样的报错。似乎无法创建catalog 连接。 直接在sql 客户端通过DDL
> 的方式注册也是报相同的错误
> Yaml 文件:
>
>
> Reading session environment from:
> file:/Users/feng/flink-1.13.1/bin/../catlog_yaml/hiveCatalog.yaml
>
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
>at
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
>at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> create catalog 'myhive'.
>
> Catalog options are:
> 'hive-conf-dir'='/Users/feng/hive-2.3.6/conf'
> 'type'='hive'
>at
> org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:270)
>at
> org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.createCatalog(LegacyTableEnvironmentInitializer.java:217)
>at
> org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.lambda$initializeCatalogs$1(LegacyTableEnvironmentInitializer.java:120)
>at java.base/java.util.HashMap.forEach(HashMap.java:1336)
>at
> org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeCatalogs(LegacyTableEnvironmentInitializer.java:117)
>at
> org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeSessionState(LegacyTableEnvironmentInitializer.java:105)
>at
> org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:233)
>at
> org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:100)
>at
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:91)
>at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
>at
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
>... 1 more
> Caused by: org.apache.flink.table.api.TableException: Could not load
> service provider for factories.
>at
> org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:507)
>at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:298)
>at
> org.apache.flink.table.factories.FactoryUtil.getCatalogFactory(FactoryUtil.java:455)
>at
> org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:251)
>... 11 more
> Caused by: java.util.ServiceConfigurationError:
> org.apache.flink.table.factories.Factory:
> org.apache.flink.table.module.hive.HiveModuleFactory not a subtype
>at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:589)
>at
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1237)
>at
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1265)
>at
> java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1300)
>at
> java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1385)
>at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
>at
> org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:503)
>... 14 more
>
> drewfranklin
> drewfrank...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>


flink1.13.1 sql client connect hivecatalog 报错

2021-11-21 文章 drewfranklin
Hello
   我按照官方文档使用 sql client 去连接hive catalog 时出错。 
我的hive version 2.3.6
  Flink version 1.13.1


感觉官方介绍的bundled 方式添加jar 包,在flink/lib 下添加如下截图的包。然后重启集群,启动了sql-client 
,连接报错如下,看报错感觉缺包,不知道缺什么包。第二种方式也尝试了下,一样的报错。似乎无法创建catalog 连接。 直接在sql 客户端通过DDL 
的方式注册也是报相同的错误
Yaml 文件:




Reading session environment from: 
file:/Users/feng/flink-1.13.1/bin/../catlog_yaml/hiveCatalog.yaml


Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
   at 
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
   at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create 
catalog 'myhive'.

Catalog options are:
'hive-conf-dir'='/Users/feng/hive-2.3.6/conf'
'type'='hive'
   at 
org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:270)
   at 
org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.createCatalog(LegacyTableEnvironmentInitializer.java:217)
   at 
org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.lambda$initializeCatalogs$1(LegacyTableEnvironmentInitializer.java:120)
   at java.base/java.util.HashMap.forEach(HashMap.java:1336)
   at 
org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeCatalogs(LegacyTableEnvironmentInitializer.java:117)
   at 
org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeSessionState(LegacyTableEnvironmentInitializer.java:105)
   at 
org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:233)
   at 
org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:100)
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:91)
   at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
   at 
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
   ... 1 more
Caused by: org.apache.flink.table.api.TableException: Could not load service 
provider for factories.
   at 
org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:507)
   at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:298)
   at 
org.apache.flink.table.factories.FactoryUtil.getCatalogFactory(FactoryUtil.java:455)
   at 
org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:251)
   ... 11 more
Caused by: java.util.ServiceConfigurationError: 
org.apache.flink.table.factories.Factory: 
org.apache.flink.table.module.hive.HiveModuleFactory not a subtype
   at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:589)
   at 
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1237)
   at 
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1265)
   at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1300)
   at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1385)
   at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
   at 
org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:503)
   ... 14 more


| |
drewfranklin
|
|
drewfrank...@163.com
|
签名由 网易邮箱大师 定制

Re: 哪里可以下载到downloads/setup-pyflink-virtual-env.sh脚本

2021-11-21 文章 Dian Fu
Flink用的1.14.0,venv.zip中的PyFlink版本是多少?

On Sun, Nov 21, 2021 at 7:59 PM Asahi Lee  wrote:

> Hi!
>    我通过如下命令提交成功了,python的参数需求-D方式传入,-py方式传入不生效:
> ./flink-1.14.0/bin/flink
> run-application -t yarn-application
> -Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/flinklib"
> -Dyarn.application.queue=d
> -Dpython.archives="/opt/venv.zip"
> -Dpython.client.executable="venv.zip/venv/bin/python"
> -Dpython.executable="venv.zip/venv/bin/python"
> -Dpython.files="/opt/test.py"
> -p 1
> -c test.PyUDFTest
> /opt/flink-python-test-1.0-SNAPSHOT.jar
>
>
>
> 但是我应用提交成功后,程序运行发生错误,如下:
>
>
> taskmanager.log
>
>
> 2021-11-19 13:59:24,030 ERROR
> /yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:296
> [] - Error processing instruction 1. Original traceback is
> Traceback (most recent call last):
>   File
> "/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 289, in _execute
>     response = task()
>   File
> "/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 362, in      lambda: self.create_worker().do_instruction(request),
> request)
>   File
> "/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 607, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File
> "/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 638, in process_bundle
>     instruction_id, request.process_bundle_descriptor_id)
>   File
> "/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 467, in get
>     self.data_channel_factory)
>   File
> "/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 868, in __init__
>     self.ops =
> self.create_execution_tree(self.process_bundle_descriptor)
>   File
> "/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 925, in create_execution_tree
>     descriptor.transforms, key=topological_height,
> reverse=True)])
>   File
> "/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 924, in      get_operation(transform_id))) for transform_id in sorted(
>   File
> "/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 812, in wrapper
>     result = cache[args] = func(*args)
>   File
> "/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 906, in get_operation
>     pcoll_id in
> descriptor.transforms[transform_id].outputs.items()
>   File
> "/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 906, in      pcoll_id in
> descriptor.transforms[transform_id].outputs.items()
>   File
> "/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 904, in      tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>   File
> "/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>

?????? ??????????????downloads/setup-pyflink-virtual-env.sh????

2021-11-21 文章 Asahi Lee
Hi!
  
 ??python??-D??-py
./flink-1.14.0/bin/flink
run-application -t yarn-application
-Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/flinklib"
-Dyarn.application.queue=d
-Dpython.archives="/opt/venv.zip"
-Dpython.client.executable="venv.zip/venv/bin/python"
-Dpython.executable="venv.zip/venv/bin/python"
-Dpython.files="/opt/test.py"
-p 1
-c test.PyUDFTest
/opt/flink-python-test-1.0-SNAPSHOT.jar



??


taskmanager.log


2021-11-19 13:59:24,030 ERROR 
/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:296
 [] - Error processing instruction 1. Original traceback is
Traceback (most recent call last):
  File 
"/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 289, in _execute
    response = task()
  File 
"/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 362, in https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/python_config/#python-client-executable

??1.14.0??-pyclientexec 
venv.zip/venv/bin/python


On Fri, Nov 19, 2021 at 10:48 AM Asahi Lee <978466...@qq.com.invalid> wrote:

> ??source 
my_env/bin/activate??PYFLINK_CLIENT_EXECUTABLE??
> jobmanagerNo module named 
pyflinkjobmanageryarn
> ??
>
>
> > LogType:jobmanager.out
> > Log Upload Time:?? ?? 18 20:48:45 +0800 2021
> > LogLength:37
> > Log Contents:
> > /bin/python: No module named pyflink
>
>
>
>
> --  --
> ??:
>  
 "user-zh"
>
 <
> dian0511...@gmail.com>;
> : 2021??11??19??(??) 9:38
> ??: "user-zh"https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#python-interpreter-of-client
>
> On Thu, Nov 18, 2021 at 9:00 PM Asahi Lee <978466...@qq.com.invalid>
> wrote:
>
> > Hi !
> > &nbsp; &nbsp;java Table api??python udf
> > 
pythonjm??/bin/python:
 No module
> named
> > pyflink??
> >
> >
> > ./flink-1.13.2/bin/flink&nbsp;
> > run-application -t yarn-application&nbsp;
> >
> 
-Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/flinklib"&nbsp;
> > -Dyarn.application.queue=d
> > -p 1&nbsp;
> > -pyarch /opt/venv.zip
> > -pyexec venv.zip/venv/bin/python&nbsp;
> > -pyfs /opt/test.py&nbsp;
> > -c test.PyUDFTest&nbsp;
> > /opt/flink-python-test-1.0-SNAPSHOT.jar
> >
> >
> >
> > ??
> > Caused by: java.lang.RuntimeException: Python callback server 
start
> failed!
> > &nbsp; &nbsp; &nbsp; &nbsp; at
> >
> 
org.apache.flink.client.python.PythonFunctionFactory.createPythonFunctionFactory(PythonFunctionFactory.java:167)
> > ~[flink-python_2.11-1.13.2.jar:1.13.2]
> > &nbsp; &nbsp; &nbsp; &nbsp; at
> >
> 
org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:88)
> > ~[flink-python_2.11-1.13.2.jar:1.13.2]
> > &nbsp; &nbsp; &nbsp; &nbsp; at
> >
> 
org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:84)
> > ~[flink-python_2.11-1.13.2.jar:1.13.2]
> > &nbsp; &nbsp; &nbsp; &nbsp; at
> >
> 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> > ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> > &nbsp; &nbsp; &nbsp; &nbsp; at
> >
> 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> > ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> > &nbsp; &nbsp; &nbsp; &nbsp; at
> >
> 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> > ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> > &nbsp; &nbsp; &nbsp; &nbsp; at
> >
> 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> > ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> > &nbsp; &nbsp; &nbsp; &nbsp; at
> >
> 
org.apache.flink