Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题
Hi yidan, 你可以試試 SQL Hints [1]. [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/hints/ yidan zhao 於 2021年11月2日 週二 下午1:03寫道: > 嗯嗯,hive catalog的确不需要重新建表,但是我的场景是:我需要通过 flinkSQL 流式将 kafka 表数据写入 hive 表。 > 因此就需要有如下属性等,而原先的hive表式spark-sql中创建的,肯定不可能带有这种属性。我目前想法是,比如针对原表 t1,我重新在 > flinkSQL 中创建个hive表t2,但是指定location为原t1的location,同时带上如下相关属性,这样就达到目的了。 > 或者说,基于现有的hive表,有什么不重定义的方法,仍然可以通过sql流式将kafka表数据写进去不。 > > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > > 'sink.partition-commit.trigger'='partition-time', > > 'sink.partition-commit.delay'='1 h', > > 'sink.partition-commit.policy.kind'='metastore,success-file'); > > Caizhi Weng 于2021年11月2日周二 上午10:47写道: > > > Hi! > > > > hive catalog 是不需要重新在 Flink SQL 里写一遍表定义的,连接到 hive catalog 的时候 Flink 就会自动读取 > > hive 里表的结构等信息。但 kafka 的表定义仍然要写。 > > > > 你的邮件里的内容具体来自哪个文档界面呢?文档里应该是想要从 Flink 里建立 hive 表,如果已经在 hive 里建过表了就不用再建了。 > > > > yidan zhao 于2021年11月1日周一 下午3:05写道: > > > > > 如题,我看了官方文档,定义好kafka和hive表。 > > > 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。 > > > > > > SET table.sql-dialect=hive;CREATE TABLE hive_table ( > > > user_id STRING, > > > order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS > > > parquet TBLPROPERTIES ( > > > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > > > 'sink.partition-commit.trigger'='partition-time', > > > 'sink.partition-commit.delay'='1 h', > > > 'sink.partition-commit.policy.kind'='metastore,success-file'); > > > SET table.sql-dialect=default;CREATE TABLE kafka_table ( > > > user_id STRING, > > > order_amount DOUBLE, > > > log_ts TIMESTAMP(3), > > > WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define > > > watermark on TIMESTAMP column) WITH (...); > > > > > > > > > > > > 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。 > > > > > > 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。 > > > 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。 > > > > > > > > > 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。 > > > > > >
Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题
嗯嗯,hive catalog的确不需要重新建表,但是我的场景是:我需要通过 flinkSQL 流式将 kafka 表数据写入 hive 表。 因此就需要有如下属性等,而原先的hive表式spark-sql中创建的,肯定不可能带有这种属性。我目前想法是,比如针对原表 t1,我重新在 flinkSQL 中创建个hive表t2,但是指定location为原t1的location,同时带上如下相关属性,这样就达到目的了。 或者说,基于现有的hive表,有什么不重定义的方法,仍然可以通过sql流式将kafka表数据写进去不。 > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > 'sink.partition-commit.trigger'='partition-time', > 'sink.partition-commit.delay'='1 h', > 'sink.partition-commit.policy.kind'='metastore,success-file'); Caizhi Weng 于2021年11月2日周二 上午10:47写道: > Hi! > > hive catalog 是不需要重新在 Flink SQL 里写一遍表定义的,连接到 hive catalog 的时候 Flink 就会自动读取 > hive 里表的结构等信息。但 kafka 的表定义仍然要写。 > > 你的邮件里的内容具体来自哪个文档界面呢?文档里应该是想要从 Flink 里建立 hive 表,如果已经在 hive 里建过表了就不用再建了。 > > yidan zhao 于2021年11月1日周一 下午3:05写道: > > > 如题,我看了官方文档,定义好kafka和hive表。 > > 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。 > > > > SET table.sql-dialect=hive;CREATE TABLE hive_table ( > > user_id STRING, > > order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS > > parquet TBLPROPERTIES ( > > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > > 'sink.partition-commit.trigger'='partition-time', > > 'sink.partition-commit.delay'='1 h', > > 'sink.partition-commit.policy.kind'='metastore,success-file'); > > SET table.sql-dialect=default;CREATE TABLE kafka_table ( > > user_id STRING, > > order_amount DOUBLE, > > log_ts TIMESTAMP(3), > > WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define > > watermark on TIMESTAMP column) WITH (...); > > > > > > > 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。 > > > > 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。 > > 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。 > > > > > > 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。 > > >
flink yarn-per-job dubug
hello, ??flink 1.14.0?? maven 3.2.5 ?? 1. ??mvn clean install -DskipTests -Dfast 2. ??flink-1.14.0/flink-dist/target/flink-1.14.0-bin/ ?? 3. conf/flink-conf.yaml ??env.java.opts: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5003" 4. ??idea(windows )?? ??Remote ?? Trantsport: Socket Host: xxx.xxx.xx.xx Port: 5003 5. ??yarn-per-job ?? bin/flink run -t yarn-per-job -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount examples/streaming/SocketWindowWordCount.jar --hostname xxx.xxx.xx.xx --port 6. ??idea ??debug ??debug flink yarn-per-job jobmanager??YarnJobClusterEntrypoint main ??YarnJobClusterEntrypoint debug?
Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题
Hi! hive catalog 是不需要重新在 Flink SQL 里写一遍表定义的,连接到 hive catalog 的时候 Flink 就会自动读取 hive 里表的结构等信息。但 kafka 的表定义仍然要写。 你的邮件里的内容具体来自哪个文档界面呢?文档里应该是想要从 Flink 里建立 hive 表,如果已经在 hive 里建过表了就不用再建了。 yidan zhao 于2021年11月1日周一 下午3:05写道: > 如题,我看了官方文档,定义好kafka和hive表。 > 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。 > > SET table.sql-dialect=hive;CREATE TABLE hive_table ( > user_id STRING, > order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS > parquet TBLPROPERTIES ( > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > 'sink.partition-commit.trigger'='partition-time', > 'sink.partition-commit.delay'='1 h', > 'sink.partition-commit.policy.kind'='metastore,success-file'); > SET table.sql-dialect=default;CREATE TABLE kafka_table ( > user_id STRING, > order_amount DOUBLE, > log_ts TIMESTAMP(3), > WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define > watermark on TIMESTAMP column) WITH (...); > > > 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。 > > 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。 > 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。 > > > 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。 >
Re: Re: Re: 公司数据密文,实现group by和join
上传的图片没法显示,通过图床工具或纯文本方式重新发一遍 lyh1067341434 于2021年11月1日周一 上午10:42写道: > 您好! > > 这样好像还是不行,因为group by id ,id还是密文字符串,还是会把id当成字符串处理,所以还是不能正确分组; > 为了更清楚表达,下面为图示: > > 谢谢您! > > > > > > > > 在 2021-10-29 10:49:35,"Caizhi Weng" 写道: > >Hi! > > > >你是不是想写这样的 SQL: > > > >SELECT id, sum(price) AS total_price FROM ( > > SELECT T1.id AS id, T2.price AS price FROM T AS T1 INNER JOIN T AS T2 ON > >decrypt_udf(T1.id, T2.id) = 1 > >) GROUP BY id > > > >这个 sql 会输出每个 id 和该 id 属于的分组的总价格。 > > > >lyh1067341434 于2021年10月29日周五 上午9:41写道: > > > >> 您好! > >> > >> > >> 感谢您在百忙之中抽空回复我的邮件,我已经按照您的建议,自定义join函数实现了密文的join,但密文的group by 还是实现不了; > >> > >> > >> 比如 有一张表 a, 表a有 > >> id,price列,数据都是密文,类似这样("MBwEELdR0JDC0OSryuQskeulP8YCCAyJLH7RwmAA"); > >> > >> > >> 如果我想求 不同id组的price之和: > >> 直接使用flink 计算:会把id的分组当成字符串处理,从而导致分组的不正确; > >> 如果调用密文计算的接口的话,把两个比较的key的密文传进入,会得到1或者0,来判断这两个密文key是否相等,从而分组可以正确; > >> > >> > >> > >> > >> 问题: > >> > >> > >> 目前group by分组,不知道在哪里实现调用密文计算的接口,从而传入两个key,来进行分组正确; > >>我看到api只能指定分组的key是哪一个; > >> > >> > >> 谢谢您! > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2021-10-28 11:09:26,"Caizhi Weng" 写道: > >> >Hi! > >> > > >> >没太明白你的需求。你的需求是不是 > >> > > >> >1. 调用一个自定义函数,用某一列密文算出一个值 k,用这个 k 作为 join key 或者 group key。 > >> > > >> >如果是这个需求,只要实现一个 udf 即可。详见 [1]。 > >> > > >> >2. 调用一个自定义函数,用某两列密文算出一个 true 或 false,如果是 true 说明 join key 匹配。 > >> > > >> >如果是这个需求,仍然只需要实现一个 udf。join 条件中调用这个 udf 即可。但如果是这个需求,不太明白你期望中的 group by > >> >是什么样的,因为不能仅通过 true false 就判断哪些数据属于同一个 group。 > >> > > >> >[1] > >> > > >> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/functions/udfs/ > >> > > >> >lyh1067341...@163.com 于2021年10月27日周三 下午5:20写道: > >> > > >> >> 您好: > >> >> > >> 目前公司数据都是密文,要进行密文数据的比较或者计算的话,只能调用公司密文计算的接口,去看了下flink的分组和join算子,都只能指定分组的key或者join的key,不知道怎么改写比较的规则,我用mapreduce实现了重写shuffle的比较规则,可以实现密文下的join和group > >> >> by,对于使用spark和flink算子不知道如何实现。 > >> >> > >> >> 问题: > >> >> 请问有啥办法,实现密文下的join和group by操作吗?(在不能解密,只能调用公司密文计算的接口) > >> >> > >> >> 谢谢您。 > >> >> > >> >> > >> >> > >> >> 发自 网易邮箱大师 > >> > > > > >
关于FlinkSQL从kafka读取数据写到hive的一些问题
如题,我看了官方文档,定义好kafka和hive表。 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。 SET table.sql-dialect=hive;CREATE TABLE hive_table ( user_id STRING, order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='metastore,success-file'); SET table.sql-dialect=default;CREATE TABLE kafka_table ( user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP column) WITH (...); 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。
Re: standalone集群重启后自动回复任务,任务的jobmaster如果失败会导致JM进程失败
补充个更完整的日志: 2021-11-01 14:15:15,849 INFO [78-cluster-io-thread-1] org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.recoverJobGraph(DefaultJobGraphStore.java:181) - Recovered JobGraph(jobId: dfced635fd8c224222a9cbaaf1c5054f). 2021-11-01 14:15:15,849 INFO [78-cluster-io-thread-1] org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125) - Successfully recovered 1 persisted job graphs. 2021-11-01 14:15:15,856 INFO [78-cluster-io-thread-1] org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(AkkaRpcService.java:232) - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_1 . 2021-11-01 14:15:22,867 INFO [30-flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.start(DefaultLeaderElectionService.java:93) - Starting DefaultLeaderElectionService with ZooKeeperLeaderElectionDriver{leaderPath='/leader/dfced635fd8c224222a9cbaaf1c5054f/job_manager_lock'}. 2021-11-01 14:15:22,892 ERROR [30-flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.entrypoint.ClusterEntrypoint.onFatalError(ClusterEntrypoint.java:454) - Fatal error occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: JobMaster for job dfced635fd8c224222a9cbaaf1c5054f failed. at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873) ~[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459) ~[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:418) ~[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152] at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) ~[?:1.8.0_152] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) ~[?:1.8.0_152] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] at
standalone集群重启后自动回复任务,任务的jobmaster如果失败会导致JM进程失败
如题,这个问题之前遇到过,当时我email问的是集群不断重启。 这次也是这个问题,集群不断重启,但分析下原因如题。看日志片段如下: 2021-11-01 14:05:36,954 INFO [78-cluster-io-thread-1] org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.recoverJobGraph(DefaultJobGraphStore.java:181) - Recovered JobGraph(jobId: dfced635fd8c224222a9cbaaf1c5054f). 2021-11-01 14:05:36,954 INFO [78-cluster-io-thread-1] org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125) - Successfully recovered 1 persisted job graphs. 2021-11-01 14:05:36,962 INFO [78-cluster-io-thread-1] org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(AkkaRpcService.java:232) - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_1 . 2021-11-01 14:05:44,810 INFO [94-flink-akka.actor.default-dispatcher-30] org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.start(DefaultLeaderElectionService.java:93) - Starting DefaultLeaderElectionService with ZooKeeperLeaderElectionDriver{leaderPath='/leader/dfced635fd8c224222a9cbaaf1c5054f/job_manager_lock'}. 2021-11-01 14:05:44,836 ERROR [94-flink-akka.actor.default-dispatcher-30] org.apache.flink.runtime.entrypoint.ClusterEntrypoint.onFatalError(ClusterEntrypoint.java:454) - Fatal error occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: JobMaster for job dfced635fd8c224222a9cbaaf1c5054f failed. at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873) ~[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT] 如上,恢复了jobgraph,开启 leader 选举(看起来像是jobmaster的leader选举服务),然后jobmaster 挂了。 如上,我想知道为什么jobmaster挂了就会导致 standalone JM 进程失败呢? JM进程是所有任务公用,即使启动后之前的某个job无法恢复,也没必要因此就挂掉吧。