回复:回复:使用StreamingFileSink向hive metadata中增加分区部分失败
就第二次提供的日志看,好像是你的namenode出现的问题 -- 发件人:MuChen <9329...@qq.com> 发送时间:2020年9月8日(星期二) 10:56 收件人:user-zh@flink.apache.org 夏帅 ; user-zh 主 题:回复: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败 在checkpoint失败的时间,tm上还有一些info和warn级别的日志: 2020-09-04 17:17:59,520 INFO org.apache.hadoop.io.retry.RetryInvocationHandler [] - Exception while invoking create of class ClientNamenodeProtocolTranslatorPB over uhadoop-op3raf-master2/10.42.52.202:8020 after 14 fail over attempts. Trying to fail over immediately. java.io.IOException: java.lang.InterruptedException at org.apache.hadoop.ipc.Client.call(Client.java:1449) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.ipc.Client.call(Client.java:1401) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at com.sun.proxy.$Proxy26.create(Unknown Source) ~[?:?] at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:295) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_144] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_144] at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at com.sun.proxy.$Proxy27.create(Unknown Source) ~[?:?] at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1721) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1657) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1582) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) ~[flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar:1.11.0] at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) ~[flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.filesystem.SuccessFileCommitPolicy.commit(SuccessFileCommitPolicy.java:45) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.filesystem.stream.StreamingFileCommitter.commitPartitions(StreamingFileCommitter.java:167) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.filesystem.stream.StreamingFileCommitter.processElement(StreamingFileCommitter.java:144) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependenci
回复:使用StreamingFileSink向hive metadata中增加分区部分失败
异常日志只有这些么?有没有详细点的
回复: flink1.9写权限认证的es6
get到了 来自钉钉专属商务邮箱-- 发件人:Yangze Guo 日 期:2020年07月17日 13:38:35 收件人:user-zh 主 题:Re: flink1.9写权限认证的es6 Hi, SQL添加认证的逻辑已经在FLINK-18361[1] 中完成了,1.12版本会支持这个功能 [1] https://issues.apache.org/jira/browse/FLINK-18361 Best, Yangze Guo On Fri, Jul 17, 2020 at 10:12 AM Dream-底限 wrote: > > hi: > 请问flink如何将数据写入到权限认证的es集群哪,没找到配置用户名密码的地方,哪位大佬帮忙解答一下
回复:flink1.9写权限认证的es6
你好,请问是FlinkSQL么 FLinkSQL可以参考下这份邮件 http://apache-flink.147419.n8.nabble.com/ddl-es-td2094.html DataStream可以尝试自定义ElasticsearchSink实现权限认证 -- 发件人:Dream-底限 发送时间:2020年7月17日(星期五) 10:12 收件人:user-zh 主 题:flink1.9写权限认证的es6 hi: 请问flink如何将数据写入到权限认证的es集群哪,没找到配置用户名密码的地方,哪位大佬帮忙解答一下
回复:flink connector formats问题
你好,这个是可以进行自定义的 参考https://jxeditor.github.io/2020/06/11/FlinkSQL%E8%87%AA%E5%AE%9A%E4%B9%89FORMAT_TYPE/ -- 发件人:酷酷的浑蛋 发送时间:2020年7月17日(星期五) 10:42 收件人:user-zh 主 题:flink connector formats问题 请问flink可以自定义format吗,目前提供的format必须要进行一次数据过滤为规则数据才行,可不可以自定义format实现自己的数据格式source呢? 目前flink支持的: | 格式 | 支持的连接器 | | CSV | Apache Kafka, Filesystem | | JSON | Apache Kafka, Filesystem, Elasticsearch | | Apache Avro | Apache Kafka, Filesystem | | Debezium CDC | Apache Kafka | | Canal CDC | Apache Kafka | | Apache Parquet | Filesystem | | Apache ORC | Filesystem |
回复:回复: 不能实时读取实时写入到 Hive 的数据
你好, 可以参考下这个问题的解决 http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html -- 发件人:wangl...@geekplus.com.cn 发送时间:2020年7月14日(星期二) 16:50 收件人:user-zh ; 夏帅 ; Leonard Xu 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据 应该是我没有理解 partitiion-commit 的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit CREATE TABLE kafka_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'ods_wms_pick_order', 'properties.bootstrap.servers' = ':9092', 'properties.group.id' = 'testGroup', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ) CREATE TABLE hive_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='metastore,success-file' ); INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'), DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order; SELECT * FROM hive_ods_wms_pick_order /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-24') */; wangl...@geekplus.com.cn Sender: 夏帅 Send Time: 2020-07-14 16:43 Receiver: user-zh; xbjtdcq Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据 你好, 这说明写入的hive文件没有进行rollup,可以贴下SQL么 -- 发件人:wangl...@geekplus.com.cn 发送时间:2020年7月14日(星期二) 16:40 收件人:user-zh ; xbjtdcq 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据 我加上了这个 tablehint 。 任务提交上去了,但客户端还是没有任何返回显示。 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: Leonard Xu 发送时间: 2020-07-14 16:17 收件人: user-zh 主题: Re: 不能实时读取实时写入到 Hive 的数据 HI, wanglei 你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。 SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 祝好, Leonard Xu > 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道: > > > 试验了一下 Flink-1.11 hive streaming 的功能 > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html > > 创建 kafka 表,通过 SQL 实时写入 Hive. > > 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI > 页面观察 这个 select * from hive_table 的 job 已经结束了。 > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn >
回复:Re: 不能实时读取实时写入到 Hive 的数据
你好, 这说明写入的hive文件没有进行rollup,可以贴下SQL么 -- 发件人:wangl...@geekplus.com.cn 发送时间:2020年7月14日(星期二) 16:40 收件人:user-zh ; xbjtdcq 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据 我加上了这个 tablehint 。 任务提交上去了,但客户端还是没有任何返回显示。 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: Leonard Xu 发送时间: 2020-07-14 16:17 收件人: user-zh 主题: Re: 不能实时读取实时写入到 Hive 的数据 HI, wanglei 你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。 SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 祝好, Leonard Xu > 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道: > > > 试验了一下 Flink-1.11 hive streaming 的功能 > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html > > 创建 kafka 表,通过 SQL 实时写入 Hive. > > 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI > 页面观察 这个 select * from hive_table 的 job 已经结束了。 > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn >
回复:Re: flink 同时sink hbase和hive,hbase少记录
你好, 本质还是StreamingFileSink,所以目前只能append -- 发件人:Zhou Zach 发送时间:2020年7月14日(星期二) 10:56 收件人:user-zh 主 题:Re:Re: flink 同时sink hbase和hive,hbase少记录 Hi Leonard, 原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式 在 2020-07-14 09:56:00,"Leonard Xu" 写道: >Hi, > >> 在 2020年7月14日,09:52,Zhou Zach 写道: >> | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid, > >看下这个抽取出来的rowkey是否有重复的呢? > >祝好, >Leonard Xu
回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.
你好, 你设置了1个小时的 SINK_PARTITION_COMMIT_DELAY -- 发件人:Zhou Zach 发送时间:2020年7月13日(星期一) 17:09 收件人:user-zh 主 题:Re:Re: Re: Table options do not contain an option key 'connector' for discovering a connector. 开了checkpoint, val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamExecutionEnv.enableCheckpointing(5 * 1000, CheckpointingMode.EXACTLY_ONCE) streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000) 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据 在 2020-07-13 16:52:16,"Jingsong Li" 写道: >有开checkpoint吧?delay设的多少? > >Add partition 在 checkpoint完成 + delay的时间后 > >Best, >Jingsong > >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach wrote: > >> Hi, >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add >> partition到hive表吗,我当前设置了参数 >> 'sink.partition-commit.policy.kind'='metastore' >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> At 2020-07-13 15:01:28, "Jingsong Li" wrote: >> >Hi, >> > >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog >> > >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 >> > >> >Best, >> >Jingsong >> > >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach wrote: >> > >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 >> >> >> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 >> >> h','sink.partition-commit.policy.kind'='success-file'); >> >> 也报错误 >> >> query: >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> | >> >> |CREATE TABLE hive_table ( >> >> | user_id STRING, >> >> | age INT >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet >> >> TBLPROPERTIES ( >> >> | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', >> >> | 'sink.partition-commit.trigger'='partition-time', >> >> | 'sink.partition-commit.delay'='1 h', >> >> | 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> |) >> >> | >> >> |""".stripMargin) >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> |CREATE TABLE kafka_table ( >> >> |uid VARCHAR, >> >> |-- uid BIGINT, >> >> |sex VARCHAR, >> >> |age INT, >> >> |created_time TIMESTAMP(3), >> >> |WATERMARK FOR created_time as created_time - INTERVAL '3' >> SECOND >> >> |) WITH ( >> >> |'connector.type' = 'kafka', >> >> |'connector.version' = 'universal', >> >> | 'connector.topic' = 'user', >> >> |-- 'connector.topic' = 'user_long', >> >> |'connector.startup-mode' = 'latest-offset', >> >> |'connector.properties.zookeeper.connect' = >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', >> >> |'connector.properties.bootstrap.servers' = >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', >> >> |'connector.properties.group.id' = 'user_flink', >> >> |'format.type' = 'json', >> >> |'format.derive-schema' = 'true' >> >> |) >> >> |""".stripMargin) >> >> >> >> >> >> >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> |INSERT INTO hive_table >> >> |SELECT uid, age, DATE_FORMAT(created_time, '-MM-dd'), >> >> DATE_FORMAT(created_time, 'HH') >> >> |FROM kafka_table >> >> | >> >> |""".stripMargin) >> >> >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' >> >> | >> >> |""".stripMargin) >> >> .print() >> >> 错误栈: >> >> Exception in thread "main" >> org.apache.flink.table.api.ValidationException: >> >> Unable to create a sink for writing table >> >> 'default_catalog.default_database.hive_table'. >> >> >> >> Table options are: >> >> >> >> 'hive.storage.file-format'='parquet' >> >> 'is_generic'='false' >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' >> >> 'sink.partition-commit.delay'='1 h' >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> 'sink.partition-commit.trigger'='partition-time' >> >> at >> >> >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> at >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> at >> >> >>
回复:flink 1.11 使用sql写入hdfs无法自动提交分区
你好, 我这边同样的代码,并没有出现类似的问题 是本地跑么,可以提供下日志信息么?
回复:Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
感谢
回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
你好, 可以看看你的代码结构是不是以下这种 val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) .. tableEnv.execute("") 如果是的话,可以尝试使用bsEnv.execute("") 1.11对于两者的execute代码实现有改动 -- 发件人:Zhou Zach 发送时间:2020年7月8日(星期三) 15:30 收件人:Flink user-zh mailing list 主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology 代码在flink 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常: Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79) at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala) 但是,数据是正常sink到了hbase,是不是executeSql误报了。。。 query: streamTableEnv.executeSql( """ | |CREATE TABLE `user` ( |uid BIGINT, |sex VARCHAR, |age INT, |created_time TIMESTAMP(3), |WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND |) WITH ( |'connector.type' = 'kafka', |'connector.version' = 'universal', |-- 'connector.topic' = 'user', |'connector.topic' = 'user_long', |'connector.startup-mode' = 'latest-offset', |'connector.properties.group.id' = 'user_flink', |'format.type' = 'json', |'format.derive-schema' = 'true' |) |""".stripMargin) streamTableEnv.executeSql( """ | |CREATE TABLE user_hbase3( |rowkey BIGINT, |cf ROW(sex VARCHAR, age INT, created_time VARCHAR) |) WITH ( |'connector.type' = 'hbase', |'connector.version' = '2.1.0', |'connector.table-name' = 'user_hbase2', |'connector.zookeeper.znode.parent' = '/hbase', |'connector.write.buffer-flush.max-size' = '10mb', |'connector.write.buffer-flush.max-rows' = '1000', |'connector.write.buffer-flush.interval' = '2s' |) |""".stripMargin) streamTableEnv.executeSql( """ | |insert into user_hbase3 |SELECT uid, | | ROW(sex, age, created_time ) as cf | FROM (select uid,sex,age, cast(created_time as VARCHAR) as created_time from `user`) | |""".stripMargin)
回复:FlinkKafkaProducer没有写入多个topic的功能
你好,可以尝试自定义KafkaSerializationSchema来实现你的业务场景 class DemoSerializationSchema extends KafkaSerializationSchema[DemoBean] { override def serialize(element: DemoBean, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { new ProducerRecord[Array[Byte], Array[Byte]](element.getTopic, element.getValue) } } -- 发件人:18579099...@163.com <18579099...@163.com> 发送时间:2020年7月8日(星期三) 10:59 收件人:user-zh 主 题:FlinkKafkaProducer没有写入多个topic的功能 我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题), 但是FlinkKafkaProducer好像只能写入一个主题里面? 18579099...@163.com
回复:【Flink的shuffle mode】
补充: 1.11的shuffle-mode配置的默认值为ALL_EDGES_BLOCKING 共有 ALL_EDGES_BLOCKING(等同于batch) FORWARD_EDGES_PIPELINEDPOINTWISE_EDGES_PIPELINED ALL_EDGES_PIPELINED(等同于pipelined)对于pipelined多出了两种选择 -- 发件人:忝忝向仧 <153488...@qq.com> 发送时间:2020年7月7日(星期二) 23:37 收件人:user-zh 主 题:回复: 【Flink的shuffle mode】 如果是批的模式,怎么在应用程序里面指定shuffle_mode呢? 另外,下面提到如果是流的计算,一定是pipeline模式. 那为什么我使用datastream做keyby流操作后,跟踪源码它的mode是UNDEFINED呢? 谢谢. --原始邮件-- 发件人:"Jingsong Li"
回复:【Flink的shuffle mode】
你好: 问题1,指定shuffle_mode tEnv.getConfig.getConfiguration.setString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, "pipeline") 问题2,mode是UNDEFINED的概念 使用UNDEFINED并不是说模式没有定义,而是由框架自己决定 The shuffle mode is undefined. It leaves it up to the framework to decide the shuffle mode. -- 发件人:忝忝向仧 <153488...@qq.com> 发送时间:2020年7月7日(星期二) 23:37 收件人:user-zh 主 题:回复: 【Flink的shuffle mode】 如果是批的模式,怎么在应用程序里面指定shuffle_mode呢? 另外,下面提到如果是流的计算,一定是pipeline模式. 那为什么我使用datastream做keyby流操作后,跟踪源码它的mode是UNDEFINED呢? 谢谢. --原始邮件-- 发件人:"Jingsong Li"
回复:【Flink的shuffle mode】
你好,可以参考下ExecutionConfigOptions,OptimizerConfigOptions和GlobalConfiguration,里面有比较清楚地介绍 -- 发件人:忝忝向仧 <153488...@qq.com> 发送时间:2020年7月6日(星期一) 12:16 收件人:user-zh 主 题:回复:【Flink的shuffle mode】 那就是说datasream默认模式就是pipeline,而批模式是batch,批的模式是存在shuffle情况下,需要等shuffle操作造成,才能发送到下游.那如果批应用有shuffle操作和没有shuffle的,是都要等这个shuffle操作完成了才能一起发给下游,还是说其他非shuffle操作完成了可以先发给下游,不用等shuffle操作完成一起再发送? 发自我的iPhone -- 原始邮件 -- 发件人: Jingsong Li
回复:Position out of bounds.
不好意思,看错了,这里是自增了 来自钉钉专属商务邮箱-- 发件人:xuhaiLong 日 期:2020年07月02日 18:46:37 收件人:夏帅 抄 送:user-zh 主 题:回复:Position out of bounds. 感谢 没看明白这个bug引起的原因是什么,或者说有什么合适的解决方案? | | 夏* | | 邮箱:xiagu...@163.com | 签名由 网易邮箱大师 定制 在2020年07月02日 18:39,夏帅 写道: 你好,请问解决了么,我看了下源码,好像是一个bug DataOutputSerializer @Override public void write(int b) throws IOException { if (this.position >= this.buffer.length) { resize(1); } this.buffer[this.position++] = (byte) (b & 0xff); } 此处position应该自增 -- 发件人:xuhaiLong 发送时间:2020年7月2日(星期四) 17:46 收件人:flink 中文社区 主 题:Position out of bounds. flink 1.10 onYarn job 中 有一个MapState[Long,Bean] https://www.helloimg.com/image/Pe1QR 程序启动一段时间(20分钟)后出现了 附件中的异常 查看对应源码也没看懂是什么引起的异常 https://www.helloimg.com/image/Peqc5
回复:Position out of bounds.
你好,请问解决了么,我看了下源码,好像是一个bug DataOutputSerializer @Override public void write(int b) throws IOException { if (this.position >= this.buffer.length) { resize(1); } this.buffer[this.position++] = (byte) (b & 0xff); } 此处position应该自增 -- 发件人:xuhaiLong 发送时间:2020年7月2日(星期四) 17:46 收件人:flink 中文社区 主 题:Position out of bounds. flink 1.10 onYarn job 中 有一个MapState[Long,Bean] https://www.helloimg.com/image/Pe1QR 程序启动一段时间(20分钟)后出现了 附件中的异常 查看对应源码也没看懂是什么引起的异常 https://www.helloimg.com/image/Peqc5
回复:flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?
你好,可以尝试自定义实现Kafka011TableSourceSinkFactory和Kafka011TableSink来实现exactly-once Kafka011TableSink @Override protected SinkFunction createKafkaProducer( String topic, Properties properties, SerializationSchema serializationSchema, Optional> partitioner) { return new FlinkKafkaProducer011<>( topic, new KeyedSerializationSchemaWrapper<>(serializationSchema), properties, partitioner, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE, 5); } 如果想要修改配置的话,具体可以参考KafkaTableSourceSinkFactoryBase 参考: https://jxeditor.github.io/2020/06/11/FlinkSQL%E5%9C%A8%E4%BD%BF%E7%94%A8%E5%88%9B%E5%BB%BA%E8%A1%A8%E8%AF%AD%E5%8F%A5%E6%97%B6%E7%9A%84%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/ -- 发件人:静谧雨寒 发送时间:2020年7月1日(星期三) 14:33 收件人:user-zh 主 题:flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once? flink sql CREATE TABLE kafka sink表,开启checkpoint后,如何配置sql sink表使用两阶事务提交,exactly-once一致性保证 ? 官档说法: Consistency guarantees: By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with checkpointing enabled., CREATE TABLE 默认是 at-least-once
回复:flink1.10 使用 ParquetAvroWriters schema 模式写数据问题
你好,我试了一下,纯DataStream的方式是可以使用的,具体使用参考`flink-formats\flink-parquet\src\test\java\org\apache\flink\formats\parquet\avro\ParquetStreamingFileSinkITCase` 在Table转DataStream的方式中,我是先将Table转换为DataStream[Row],然后再进行转换生成DataStream[GenericRecord] dataStream.map(x => { ...val fields = new util.ArrayList[Schema.Field] fields.add(new Schema.Field("platform", create(org.apache.avro.Schema.Type.STRING), "platform", null)) fields.add(new Schema.Field("event", create(org.apache.avro.Schema.Type.STRING), "event", null)) fields.add(new Schema.Field("dt", create(org.apache.avro.Schema.Type.STRING), "dt", null)) val parquetSinkSchema: Schema = createRecord("pi", "flinkParquetSink", "flink.parquet", true, fields) val record = new GenericData.Record(parquetSinkSchema).asInstanceOf[GenericRecord] record.put("platform", x.get(0)) record.put("event", x.get(1)) record.put("dt", x.get(2)) record }) -- 发件人:yingbo yang 发送时间:2020年6月29日(星期一) 10:04 收件人:夏帅 抄 送:user-zh 主 题:Re: flink1.10 使用 ParquetAvroWriters schema 模式写数据问题 你好: 可以使用 GenericRecordAvroTypeInfo 这个类型,但是这个类型只适合于 table 中只有一个 字段的情况;否则会出现异常: 代码: ArrayList fields = new ArrayList(); fields.add(new org.apache.avro.Schema.Field("id", org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), "id", JsonProperties.NULL_VALUE)); fields.add(new org.apache.avro.Schema.Field("time", org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), "time", JsonProperties.NULL_VALUE)); org.apache.avro.Schema parquetSinkSchema = org.apache.avro.Schema.createRecord("pi", "flinkParquetSink", "flink.parquet", true, fields); String fileSinkPath = "./xxx.text/rs6/"; GenericRecordAvroTypeInfo genericRecordAvroTypeInfo = new GenericRecordAvroTypeInfo(parquetSinkSchema); DataStream testDataStream1 = flinkTableEnv.toAppendStream(test, genericRecordAvroTypeInfo); testDataStream1.print().setParallelism(1); StreamingFileSink parquetSink = StreamingFileSink. forBulkFormat(new Path(fileSinkPath), ParquetAvroWriters.forGenericRecord(parquetSinkSchema)) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .build(); testDataStream1.addSink(parquetSink).setParallelism(1); flinkTableEnv.execute("ReadFromKafkaConnectorWriteToLocalFileJava"); 异常: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/yyb/Software/localRepository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/yyb/Software/localRepository/org/apache/logging/log4j/log4j-slf4j-impl/2.6.2/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] root |-- id: STRING |-- time: STRING 09:40:35,872 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.types.Row does not contain a getter for field fields 09:40:35,874 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.types.Row does not contain a setter for field fields 09:40:35,874 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 09:40:36,191 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.types.Row does not contain a getter for field fields 09:40:36,191 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.types.Row does not contain a setter for field fields 09:40:36,191 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. Exception in thread "main" org.apache.flink.table.api.TableException: Arity [2] of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@2149594a] does not match the number[1] of requested type [GenericRecord("{"type":"error","name":"pi","namespace":"flink.parquet","doc":"flinkParquetSink","fields":[{"name":"id","type":&quo
回复:flink1.10 使用 ParquetAvroWriters schema 模式写数据问题
你好,这个问题从异常来看是使用TupleTypeInfo导致的,可以试下使用GenericRecordAvroTypeInfo -- 发件人:yingbo yang 发送时间:2020年6月28日(星期日) 17:38 收件人:user-zh 主 题:flink1.10 使用 ParquetAvroWriters schema 模式写数据问题 Hi: 在使用 ParquetAvroWriters.forGenericRecord(Schema schema) 写parquet文件的时候 出现 类转化异常: 下面是我的代码: // //transfor 2 dataStream // TupleTypeInfo tupleTypeInfo = new TupleTypeInfo(GenericData.Record.class, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); TupleTypeInfo tupleTypeInfo = new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); DataStream testDataStream = flinkTableEnv.toAppendStream(test, tupleTypeInfo); testDataStream.print().setParallelism(1); ArrayList fields = new ArrayList(); fields.add(new org.apache.avro.Schema.Field("id", org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), "id", JsonProperties.NULL_VALUE)); fields.add(new org.apache.avro.Schema.Field("time", org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), "time", JsonProperties.NULL_VALUE)); org.apache.avro.Schema parquetSinkSchema = org.apache.avro.Schema.createRecord("pi", "flinkParquetSink", "flink.parquet", true, fields); String fileSinkPath = "./xxx.text/rs6/"; StreamingFileSink parquetSink = StreamingFileSink. forBulkFormat(new Path(fileSinkPath), ParquetAvroWriters.forGenericRecord(parquetSinkSchema)) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .build(); testDataStream.addSink(parquetSink).setParallelism(1); flinkTableEnv.execute("ReadFromKafkaConnectorWriteToLocalFileJava"); 下面是异常: 09:29:50,283 INFO org.apache.flink.runtime.taskmanager.Task - Sink: Unnamed (1/1) (79505cb6ab2df38886663fd99461315a) switched from RUNNING to FAILED.09:29:50,283 INFO org.apache.flink.runtime.taskmanager.Task - Sink: Unnamed (1/1) (79505cb6ab2df38886663fd99461315a) switched from RUNNING to FAILED.java.lang.ClassCastException: org.apache.flink.api.java.tuple.Tuple2 cannot be cast to org.apache.avro.generic.IndexedRecord at org.apache.avro.generic.GenericData.getField(GenericData.java:697) at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:188) at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165) at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128) at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299) at org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52) at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:274) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:445) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748)09:29:50,284 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Sink: Unnamed (1/1) (79505cb6ab2df38886663fd99461315a).09:29:50,285 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Sink: Unnamed (1/1) (79505cb6ab2df38886663fd99461315a) [FAILED]09:29:50,289 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor- Un-registering task and sending final execution state FAILED to JobManager for task Sink: Unnamed (1/1) 79505cb6ab2df38886663fd99461315a.09:29:50,293 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed (1/1) (79505cb6ab2df38886663fd99461315a) switched from RUNNING to FAILED.java.lang.ClassCastException: org.apache.flink.api.java.tuple.Tuple2 cannot be cast to org.apache.avro.generic.IndexedRecord at
Flink新出的OrcBulkWriterFactory有没有大佬给个详细的Demo
自己在使用时,会有文件生成,但是文件内并不包含数据
回复:Flink1.11-release编译部署后sql-client的bug
好的,感谢 -- 发件人:godfrey he 发送时间:2020年6月2日(星期二) 12:32 收件人:user-zh 抄 送:夏帅 主 题:Re: Flink1.11-release编译部署后sql-client的bug Hi, 夏帅 感谢反馈问题,我建了一个issue https://issues.apache.org/jira/browse/FLINK-18055,应该今天就可以fix Best, Godfrey Leonard Xu 于2020年6月2日周二 下午12:13写道: Hi, 夏帅 感谢反馈,这应该是个bug,我 这边本地也复现了,我先看下哈 祝好, Leonard Xu > 在 2020年6月2日,11:57,夏帅 写道: > > 是我编译的问题么,在window下编译的
Flink1.11-release编译部署后sql-client的bug
大家好,有人编译部署过flink-1.11-release么,为什么我使用sql-client时设置了catalog 但是并不生效,顺带自动补全也不太好使 是我编译的问题么,在window下编译的 编译步骤见链接 https://jxeditor.github.io/2020/06/01/Flink1.11.0%E7%BC%96%E8%AF%91/ $ mvn -version Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00) Flink SQL> show catalogs; default_catalog hive Flink SQL> use catalog hive; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name [`hive`] does not exist.
回复:Kafka Consumer反序列化错问题
可以排除一下是否是jar包冲突 -- 发件人:Even <452232...@qq.com> 发送时间:2020年5月29日(星期五) 16:17 收件人:user-zh 主 题:Kafka Consumer反序列化错问题 Hi! 请教一个Kafka Consumer反序列问题: 一个kafkaconsumerjob 提交到Flink session cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下: 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)) 2020-05-2717:05:22 org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.