转发:

2023-04-13 文章 孟令平


发件人: Henry meng (孟令平)
发送时间: 2023年4月13日 15:27
收件人: 'user-zh-i...@flink.apache.org' 
主题:


public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
//env.enableCheckpointing(6);
  SingleOutputStreamOperator dataStream = env.addSource(new 
FlinkRocketMQConsumer("10.164.15.31:9876","flink_data"))
.uid("source-id");
  dataStream .print("**");
  DataStream datStream =dataStream.process(new MyFunction());
  StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  datStream.print("=>");
  Table inputTable = 
tableEnv.fromDataStream(datStream,"devId,identifier,dataValue,dataTime,tenantId");//"devId,identifier,dataValue,dataTime,tenantId"
  tableEnv.createTemporaryView("InputTable", inputTable);
  Table resultTable = tableEnv.sqlQuery(
"select devId,'DAY_POWER_GENERATION' as identify, 
(MAX(dataValue)-Min(dataValue)) value1,MAX(dataTime) datatime, " +
  "tenantId from InputTable where 
identifier='TOTAL_POWER_GENERATION' group by devId,tenantId");
//resultTable.print();
  DataStream resultStream = tableEnv.toChangelogStream(resultTable);
  resultStream.print(">");
//tableEnv.createTemporaryView("resultTable", resultTable);

  tableEnv.executeSql("CREATE TABLE print_table (`DEV_ID` 
BIGINT,`IDENTIFIER` String," +
"`DATA_VALUE` Decimal(16,4),`DATA_TIME` TIMESTAMP,`TENANT_ID` 
BIGINT) " +
"WITH ('connector' = 'print')");
//tableEnv.executeSql("insert into print_table select 
devId,identifier,dataValue,dataTime,tenantId from resultTable");
  inputTable.executeInsert("print_table");
  env.execute("FlinkRocketMQConsumerDemo");}

如上代码所示,当我添加上env.execute()方法后,发现print_table不打印数据了这是什么原因?
StreamExecutionEnvironment.execute()和StreamTableEnvironment.executeSql()同时执行有什么问题吗?


回复: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

2022-05-12 文章 徐战辉


hi, Yuxia,   
这边是想咨询下,如何设置flink配置及作业参数,在取消作业重新部署、flink作业失败重跑情况下,保证不丢失数据。

目前有一份作业,开启checkpoint,  cancel 后重新启动,发现数据会丢失1小部分。




1. flink.conf


execution.checkpointing.interval: 1
execution.checkpointing.externalized-checkpoint-retention: 
RETAIN_ON_CANCELLATION
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true

state.backend: filesystem
state.checkpoints.dir: hdfs://**:8020/flink/checkpoints
state.savepoints.dir: hdfs://:8020/flink/savepoints


2. source table
CREATE TABLE source_kafka_nginxlog (
 ts BIGINT,
 ..
 pt AS PROCTIME()
) WITH (
 'connector' = 'kafka',
 'topic' = 'nginxlog',
-- 有将flink 1.15针对的补丁(FLINK-24697)打上

 'scan.startup.mode' = 'group-offsets',
 'properties.auto.offset.reset' = 'latest', 

 'properties.bootstrap.servers' = '***:9092',
 'properties.group.id' = 'zep',
 'format'='json'
);


3. sink table



CREATE TABLE sink_kafka_nginxlog_statistic (
 ts BIGINT,
  ..
 clt_rq BIGINT not null
) WITH (
 'connector' = 'kafka',
 'topic' = 'nginxlog-statistic-flink',
 'sink.parallelism' = '20',
 'sink.delivery-guarantee' = 'exactly-once',
 'sink.transactional-id-prefix' = 'nginxlog-statistic-flink',
 'properties.transaction.timeout.ms' = '360',
 'scan.startup.mode' = 'group-offsets',
 'properties.auto.offset.reset' = 'latest', 
 'properties.bootstrap.servers' = '***:9092',
 'properties.group.id' = 'zep',
 'value.format' = 'csv'
)
Best Regards

| |
Jerry Guo
|
|
wangyixuhongm...@163.com
|
 回复的原邮件 
| 发件人 | yuxia |
| 发送日期 | 2022年5月12日 15:16 |
| 收件人 | user-zh |
| 主题 | Re: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) |
hi,可以解释一下具体是想咨询什么问题?

Best regards,
Yuxia

- 原始邮件 -
发件人: "徐战辉" 
收件人: "user-zh" 
发送时间: 星期四, 2022年 5 月 12日 上午 10:53:00
主题: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

| |
Jerry Guo
|
|
wangyixuhongm...@163.com
|
 转发的原邮件 
| 发件人 | 徐战辉 |
| 发送日期 | 2022年5月12日 10:38 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | 基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) |


Re: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

2022-05-12 文章 yuxia
hi,可以解释一下具体是想咨询什么问题?

Best regards,
Yuxia

- 原始邮件 -
发件人: "徐战辉" 
收件人: "user-zh" 
发送时间: 星期四, 2022年 5 月 12日 上午 10:53:00
主题: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

| |
Jerry Guo
|
|
wangyixuhongm...@163.com
|
 转发的原邮件 
| 发件人 | 徐战辉 |
| 发送日期 | 2022年5月12日 10:38 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | 基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) |


转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

2022-05-11 文章 徐战辉




| |
Jerry Guo
|
|
wangyixuhongm...@163.com
|
 转发的原邮件 
| 发件人 | 徐战辉 |
| 发送日期 | 2022年5月12日 10:38 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | 基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) |

转发: flinkcdc:slave with the same server_uuid/server_id as this slave has connected to the master;

2022-03-14 文章 maker_d...@foxmail.com
时隔一个月又遇到了这个问题,现在有人能帮忙解决一下吗?



maker_d...@foxmail.com
 
发件人: maker_d...@foxmail.com
发送时间: 2022-02-15 14:13
收件人: user-zh@flink.apache.org
主题: flinkcdc:slave with the same server_uuid/server_id as this slave has 
connected to the master;
flink version:flink-1.13.5
cdc version:2.1.1

在使用flinkcdc同步多个表时遇到报错:
org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
backoffTimeMS=1)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at sun.reflect.GeneratedMethodAccessor135.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
exception
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:223)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 3 received 
unexpected exception while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:148)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103)

转发: flink创建视图后,SELECT语句后使用OPTIONS报错

2022-02-16 文章 liangjinghong
感谢老师的回复,然而我的部署环境下的lib中没有您说的这个包,请问是要移除哪个包呢?

我的lib下有的包:
flink-csv-1.13.0.jar
flink-dist_2.11-1.13.0.jar
flink-json-1.13.0.jar
flink-shaded-zookeeper-3.4.14.jar
flink-sql-connector-mysql-cdc-2.1.1.jar
flink-table_2.11-1.13.0.jar
flink-table-blink_2.11-1.13.0.jar
log4j-1.2-api-2.12.1.jar
log4j-api-2.12.1.jar
log4j-core-2.12.1.jar
log4j-slf4j-impl-2.12.1.jar
-邮件原件-
发件人: godfrey he [mailto:godfre...@gmail.com] 
发送时间: 2022年2月16日 16:47
收件人: user-zh 
主题: Re: flink创建视图后,SELECT语句后使用OPTIONS报错

Hi liangjinghong,

原因是 blink planner 中引入并修改了 SqlTableRef 类, 而 Legacy planner 中没有引入 SqlTableRef
类,从而导致加载到了Calcite 中 SqlTableRef (该类有问题)。
解决方案:如果只使用到了blink planner,可以把legacy planner 的包冲lib下移除。

Best,
Godfrey

liangjinghong  于2022年2月14日周一 17:26写道:

> 各位老师们好,以下代码在开发环境中可以执行,打包部署后报错:
>
> 代码:
>
> CREATE VIEW used_num_common
>
> (toolName,region,type,flavor,used_num)
>
> AS
>
> select info.toolName as toolName,r.regionName as
> region,f.type,f.flavor,count(1) as used_num from
>
> tbl_schedule_job/*+ OPTIONS('server-id'='1001-1031') */ job
>
> join
>
> tbl_schedule_task/*+ OPTIONS('server-id'='2001-2031') */ task
>
> on job.jobId = task.jobId
>
> join
>
> tbl_broker_node/*+ OPTIONS('server-id'='3001-3031') */  node
>
> on task.nodeId = node.id
>
> join
>
> tbl_app_info/*+ OPTIONS('server-id'='4001-4031') */ info
>
> on job.appId = info.appId
>
> join
>
> tbl_region r
>
> on node.region/*+ OPTIONS('server-id'='5001-5031') */ = r.region
>
> join
>
> tbl_flavor/*+ OPTIONS('server-id'='6001-6031') */  f
>
> on node.resourcesSpec = f.flavor
>
> where job.jobStatus in ('RUNNING','ERROR','INITING')
>
> and task.taskStatus in ('RUNNING','ERROR','INITING')
>
> and node.machineStatus <> 'DELETED'
>
> and toolName is not null
>
> group by info.toolName,r.regionName,f.type,f.flavor
>
> …
>
> 打包部署后报错如下:
>
> The main method caused an error: class org.apache.calcite.sql.SqlSyntax$6:
> SPECIAL
>
> 2022-02-08 13:33:39,350 WARN
> org.apache.flink.client.deployment.application.DetachedApplicationRunn
> er []
> - Could not execute application:
>
> org.apache.flink.client.program.ProgramInvocationException: The main 
> method caused an error: class org.apache.calcite.sql.SqlSyntax$6: 
> SPECIAL
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(Package
> dProgram.java:372)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeF
> orExecution(PackagedProgram.java:222)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:11
> 4)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunn
> er.tryExecuteJobs(DetachedApplicationRunner.java:84)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunn
> er.run(DetachedApplicationRunner.java:70)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$hand
> leRequest$0(JarRunHandler.java:102)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFutu
> re.java:1700)
> [?:?]
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515
> )
> [?:?]
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.r
> un(ScheduledThreadPoolExecutor.java:304)
> [?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.j
> ava:1128)
> [?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.
> java:628)
> [?:?]
>
> at java.lang.Thread.run(Thread.java:829) [?:?]
>
> Caused by: java.lang.UnsupportedOperationException: class
> org.apache.calcite.sql.SqlSyntax$6: SPECIAL
>
> at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlSyntax$6.unparse(SqlSyntax.java:116)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at 
> org.apache.calcite.sql.SqlOperator.unparse(SqlOperator.java:329)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at 
> org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:101)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:19
> 9)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at 
> org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:19
> 9)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at 
> 

转发:FLIP-146中TableSource并行度设置预计哪个版本做?

2021-03-31 文章 张立志



退订

- 转发的邮件 -

发件人: Luna Wong
发送日期: 2021年03月31日 21:45
收件人: user-zh
主题: FLIP-146中TableSource并行度设置预计哪个版本做?
DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。


转发:在FlinkKafkaProducer获取sink表的建表key

2021-03-29 文章 Jimmy Zhang





|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

- 转发的邮件 -

发件人: Jimmy Zhang
发送日期: 2021年03月29日 14:05
收件人: Qishang
抄送人:
主题: 回复:在FlinkKafkaProducer获取sink表的建表key
Hi,Qiishang,非常感谢你的回复,我看了你说的代码,应该是可以解决我的需求,不过我还没有细看,因为本身这个需求可能涉及Dynamic这块不多。
另外,我已经通过这种方法成功解决该问题。
1.我发现FlinkKafkaProducer是在KafkaTableSink.createKafkaProducer中进行构造的
2.KafkaTableSink继承于KafkaTableSinkBase,而在后者中,有TableSchema类作为成员变量,而TableSchema有getFieldNames方法获取到sink表字段名字,KafkaTableSinkBase封装了这个方法,名字一样,返回值是一个String[],这正是我需要的。
3.我在KafkaTableSink.createKafkaProducer中利用super.getFieldNames获取到String[],并新创建一个FlinkKafkaProducer的构造函数,将参数传入,达到我的目的。




|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

在2021年03月29日 11:26,Qishang 写道:
Hi Jimmy.
FlinkKafkaProducer 里面是没有的,可以试着从  KafkaDynamicSink 里面传到 FlinkKafkaProducer
中,org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink#physicalDataType
这个里面可以拿到

Jimmy Zhang <13669299...@163.com> 于2021年3月18日周四 上午10:40写道:

> Hi!大家好。
> 目前有一个需求,需要获取Kafka
> sink表的所有建表字段,而且需要在FlinkKafkaProducer中进行操作,看了源码,没有找到获取这个信息的接口,大家有知道的吗?非常感谢!
> 例如:CREATE TABLE kafkaTable (
>
>  user_id BIGINT,
>  item_id BIGINT,
>  category_id BIGINT,
>  behavior STRING,
>  ts TIMESTAMP(3)
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'csv',
>  'scan.startup.mode' = 'earliest-offset'
> )
> 想获取到   user_id, item_id ,category_id ,behavior这四个字段。
>
>
> | |
> Jimmy Zhang
> |
> |
> 13669299...@163.com
> |
> 签名由网易邮箱大师定制


转发:flink 设置broadcastStream 的MapStateDescriptor

2021-01-17 文章 smq
发自我的iPhone


-- 原始邮件 --
发件人: 明启 孙 <374060...@qq.com
发送时间: 2021年1月18日 11:30
收件人: user-zh 

Re: 转发:flink-sql字段类型问题

2021-01-14 文章 yinghua...@163.com

[root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls 
hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; 
support was removed in 8.0
log4j:WARN No such property [datePattern] in 
org.apache.log4j.RollingFileAppender.
21/01/14 17:05:50 INFO util.NativeCodeLoader: Loaded the native-hadoop library
Found 1 items
-rw-rw-r--   3 yarn hdfs   5388 2021-01-14 17:03 
hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6/_metadata
   // 这个是通过JobManger看到已经checkpoing完成后去查询出来的记录,的确是生成了,里面已经包含了_metadata文件
[root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls 
hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
  //我停止任务后再去查询时,这个目录已经删除了,出错如下
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; 
support was removed in 8.0
log4j:WARN No such property [datePattern] in 
org.apache.log4j.RollingFileAppender.
21/01/14 17:06:17 INFO util.NativeCodeLoader: Loaded the native-hadoop library
ls: 
`hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6':
 No such file or directory//出错信息



yinghua...@163.com
 
发件人: 郝文强
发送时间: 2021-01-14 17:24
收件人: user-zh
主题: 转发:flink-sql字段类型问题
 
 
 
 
| |
郝文强
|
|
18846086...@163.com
|
签名由网易邮箱大师定制
 
 
 
- 转发邮件信息 -
 
发件人: 郝文强 <18846086...@163.com>
发送日期: 2021年01月14日 17:23
发送至: d...@flink.apache.org 
主题: 转发:flink-sql字段类型问题
 
 
 
 
| |
郝文强
|
|
18846086...@163.com
|
签名由网易邮箱大师定制
 
 
 
----- 转发邮件信息 -
 
发件人: 郝文强 <18846086...@163.com>
发送日期: 2021年01月14日 17:22
发送至: dev-h...@flink.apache.org 
主题: flink-sql字段类型问题
sql-client 创建表 报错java.math.BigInteger cannot be cast to java.lang.Long
麻烦各位帮看一下
 
 
源数据表是 mysql的information_schema.tables 表
表结构如下:
table_catalog varchar(64)
table_schema  varchar(64)
table_name  varchar(64)
table_type  enum('base table','view','system view')
engine  varchar(64)
version int
row_format  enum('fixed','dynamic','compressed','redundant','compact','paged')
table_rows  bigint unsigned
avg_row_length  bigint unsigned
data_length bigint unsigned
max_data_length bigint unsigned
index_length  bigint unsigned
data_free bigint unsigned
auto_increment  bigint unsigned
create_time timestamp
update_time datetime
check_time  datetime
table_collation varchar(64)
checksum  bigint
create_options  varchar(256)
table_comment text
我的flink sql 建表语句:
   CREATE TABLE info_table (
  TABLE_CATALOG STRING,
  TABLE_SCHEMA STRING,
  TABLE_NAME STRING,
  TABLE_TYPE STRING,
  ENGINE STRING,
  VERSION INT,
  ROW_FORMAT STRING,
  TABLE_ROWS BIGINT,
  AVG_ROW_LENGTH BIGINT,
  DATA_LENGTH BIGINT,
  MAX_DATA_LENGTH BIGINT,
  INDEX_LENGTH BIGINT,
  DATA_FREE BIGINT,
  AUTO_INCREMENT BIGINT,
  CREATE_TIME TIMESTAMP,
  UPDATE_TIME TIMESTAMP,
  CHECK_TIME TIMESTAMP,
  TABLE_COLLATION STRING,
  CHECKSUM INTEGER,
  CREATE_OPTIONS STRING,
  TABLE_COMMENT STRING,
  PRIMARY KEY (`TABLE_NAME`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost:3306/information_schema',
  'username' = 'root',
  'password' = 'root',
  'table-name' = 'TABLES'
);
 
 
反复改了几次类型都报错: 
 
java.math.BigInteger cannot be cast to java.lang.Integer
 
java.lang.Long cannot be cast to java.math.BigDecimal
 
java.lang.Long cannot be cast to java.lang.Integer
 
| |
郝文强
|
|
18846086...@163.com
|
签名由网易邮箱大师定制
 


转发:flink-sql字段类型问题

2021-01-14 文章 郝文强




| |
郝文强
|
|
18846086...@163.com
|
签名由网易邮箱大师定制



- 转发邮件信息 -

发件人: 郝文强 <18846086...@163.com>
发送日期: 2021年01月14日 17:23
发送至: d...@flink.apache.org 
主题: 转发:flink-sql字段类型问题




| |
郝文强
|
|
18846086...@163.com
|
签名由网易邮箱大师定制



----- 转发邮件信息 -

发件人: 郝文强 <18846086...@163.com>
发送日期: 2021年01月14日 17:22
发送至: dev-h...@flink.apache.org 
主题: flink-sql字段类型问题
sql-client 创建表 报错java.math.BigInteger cannot be cast to java.lang.Long
麻烦各位帮看一下


源数据表是 mysql的information_schema.tables 表
表结构如下:
table_catalog varchar(64)
table_schema  varchar(64)
table_name  varchar(64)
table_type  enum('base table','view','system view')
engine  varchar(64)
version int
row_format  enum('fixed','dynamic','compressed','redundant','compact','paged')
table_rows  bigint unsigned
avg_row_length  bigint unsigned
data_length bigint unsigned
max_data_length bigint unsigned
index_length  bigint unsigned
data_free bigint unsigned
auto_increment  bigint unsigned
create_time timestamp
update_time datetime
check_time  datetime
table_collation varchar(64)
checksum  bigint
create_options  varchar(256)
table_comment text
我的flink sql 建表语句:
   CREATE TABLE info_table (
  TABLE_CATALOG STRING,
  TABLE_SCHEMA STRING,
  TABLE_NAME STRING,
  TABLE_TYPE STRING,
  ENGINE STRING,
  VERSION INT,
  ROW_FORMAT STRING,
  TABLE_ROWS BIGINT,
  AVG_ROW_LENGTH BIGINT,
  DATA_LENGTH BIGINT,
  MAX_DATA_LENGTH BIGINT,
  INDEX_LENGTH BIGINT,
  DATA_FREE BIGINT,
  AUTO_INCREMENT BIGINT,
  CREATE_TIME TIMESTAMP,
  UPDATE_TIME TIMESTAMP,
  CHECK_TIME TIMESTAMP,
  TABLE_COLLATION STRING,
  CHECKSUM INTEGER,
  CREATE_OPTIONS STRING,
  TABLE_COMMENT STRING,
  PRIMARY KEY (`TABLE_NAME`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost:3306/information_schema',
  'username' = 'root',
  'password' = 'root',
  'table-name' = 'TABLES'
);


反复改了几次类型都报错: 

java.math.BigInteger cannot be cast to java.lang.Integer

java.lang.Long cannot be cast to java.math.BigDecimal

java.lang.Long cannot be cast to java.lang.Integer

| |
郝文强
|
|
18846086...@163.com
|
签名由网易邮箱大师定制



Re: 转发: 两条流去重后再关联出现不符合预期数据

2020-12-15 文章 hdxg1101300...@163.com
我是这样想的,因为最后的两条流关联是 两条结果流的关联,两条结果流 
都属于回撤流,任何一边变化都是2条消息;对于左侧第一条就是回撤,第二条就是变化后的;但是右边发生变化 则会有两条数据,false消息 和左边关联 
认为变化整个流表示变化回撤再显示关联后的数据;true数据来了再次关联 认为整个流变化;撤回再关联发出;
我的想法是可不可以 之和右边流为true的数据关联;



hdxg1101300...@163.com
 
发件人: hdxg1101300123
发送时间: 2020-12-15 23:44
收件人: user-zh
主题: 转发: 两条流去重后再关联出现不符合预期数据
-- 转发的邮件 --
发件人:hdxg1101300123 
日期:2020年12月15日 10:36
主题:两条流去重后再关联出现不符合预期数据
收件人:user-zh 
抄送:
 
> 你好: 
> 我在使用flink 
> 1.11.2版本的时候使用flinksql处理两条流。因为两条流都是数据库变更信息,我需要取最新的数据关联;所以分别对两条流做row_number=1 
> (SELECT [column_list] FROM ( 
>SELECT [column_list],
>  ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
>ORDER BY time_attr [asc|desc]) AS rownum
>FROM table_name) 
> WHERE rownum = 1) 
> 去重后再左关联; 
> 前期当左流变更都没有问题,结果符合预期;当右流有数据时,第一条数据也符合预期,但是右流在发送一条数据出现变更时,出现了一条不符合预期的数据; 
> 
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null) 
> right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,8,160793279) 
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null) 
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279) 
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279) 
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279)
>  
> right> (false,3774bca649224249bdbcb8e7c80b52f9,1,0,8,160793279) 
> right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,1,1607933006000) 
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279)
>  
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,1,1607933006000)
>  
>
> 第1行左流来了数据显示true,此时右流没有数据结果是null; 
> 第2行右流来了数据,显示为true(单独打印了右流的结果); 
> 第3行显示左流撤回; 
> 第4行 左右流数据关联上,正常显示; 
> 第5行 左流数据变更,数据撤回; 
> 第6行 显示变更后的数据; 
> 第7行 右流数据变化,数据撤回; 
> 第8行 显示右流最新的结果; 
> 第9行 因为右流数据变化 所以左流(关联数据)撤回; 
> 第10行 和第11 行 不符合预期; 
> 正常应该是 右流发生变化 第9行 因为右流数据变化 所以左流(关联数据)撤回;然后右流的最新数据和左流产生结果;显示第12行数据才对; 
> 所以想请教一下大家; 
>
> 1607998361520> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
> 1607998361520> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
>
> 我的sql语句如下 
> String sql = "SELECT a.sheetId 
> sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," + 
> " 
> sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId,provided,satisfied,score,operateTime
>  " + 
> " from (SELECT 
> sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," + 
> " sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId" 
> + 
> " FROM (SELECT *," + 
> " ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime 
> desc) AS rownum " + 
> "   FROM sheetMain)" + 
> " WHERE rownum = 1 ) a" + 
> " left JOIN " + 
> " (select sheetId,provided,satisfied,score,operateTime from (SELECT 
> *," + 
> " ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime 
> desc) AS rownum " + 
> "   FROM sheetAnswers)" + 
> " WHERE rownum = 1 ) c" + 
> " ON a.sheetId = c.sheetId " ; 
>
>
>
> hdxg1101300...@163.com 


转发: 两条流去重后再关联出现不符合预期数据

2020-12-15 文章 hdxg1101300123
-- 转发的邮件 --
发件人:hdxg1101300123 
日期:2020年12月15日 10:36
主题:两条流去重后再关联出现不符合预期数据
收件人:user-zh 
抄送:

> 你好: 
>     我在使用flink 
> 1.11.2版本的时候使用flinksql处理两条流。因为两条流都是数据库变更信息,我需要取最新的数据关联;所以分别对两条流做row_number=1 
> (SELECT [column_list] FROM ( 
>    SELECT [column_list],
>  ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
>    ORDER BY time_attr [asc|desc]) AS rownum
>    FROM table_name) 
> WHERE rownum = 1) 
> 去重后再左关联; 
> 前期当左流变更都没有问题,结果符合预期;当右流有数据时,第一条数据也符合预期,但是右流在发送一条数据出现变更时,出现了一条不符合预期的数据; 
>     
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null) 
> right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,8,160793279) 
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null) 
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279) 
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279) 
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279)
>  
> right> (false,3774bca649224249bdbcb8e7c80b52f9,1,0,8,160793279) 
> right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,1,1607933006000) 
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279)
>  
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,1,1607933006000)
>  
>
> 第1行左流来了数据显示true,此时右流没有数据结果是null; 
> 第2行右流来了数据,显示为true(单独打印了右流的结果); 
> 第3行显示左流撤回; 
> 第4行 左右流数据关联上,正常显示; 
> 第5行 左流数据变更,数据撤回; 
> 第6行 显示变更后的数据; 
> 第7行 右流数据变化,数据撤回; 
> 第8行 显示右流最新的结果; 
> 第9行 因为右流数据变化 所以左流(关联数据)撤回; 
> 第10行 和第11 行 不符合预期; 
> 正常应该是 右流发生变化 第9行 因为右流数据变化 所以左流(关联数据)撤回;然后右流的最新数据和左流产生结果;显示第12行数据才对; 
> 所以想请教一下大家; 
>
> 1607998361520> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
> 1607998361520> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
>
> 我的sql语句如下 
> String sql = "SELECT a.sheetId 
> sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," + 
>     " 
> sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId,provided,satisfied,score,operateTime
>  " + 
>     " from (SELECT 
> sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," + 
>     " sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId" 
> + 
>     " FROM (SELECT *," + 
>     " ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime 
> desc) AS rownum " + 
>     "   FROM sheetMain)" + 
>     " WHERE rownum = 1 ) a" + 
>     " left JOIN " + 
>     " (select sheetId,provided,satisfied,score,operateTime from (SELECT 
> *," + 
>     " ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime 
> desc) AS rownum " + 
>     "   FROM sheetAnswers)" + 
>     " WHERE rownum = 1 ) c" + 
>     " ON a.sheetId = c.sheetId " ; 
>
>
>
> hdxg1101300...@163.com 


转发:flink1.11流式写入hive速度慢的问题

2020-10-09 文章 me
dataStream读取速度在5000条每秒,没有其他的处理逻辑,仅仅是将datastream 转换为table


 原始邮件 
发件人: me
收件人: user-zh
发送时间: 2020年10月9日(周五) 15:34
主题: flink1.11流式写入hive速度慢的问题


flink1.11 将datastream转为table之后,再流式的写入hive表中,发现写入速度只有几十条每秒
val chaitin_test = tableEnv.fromDataStream(dataStream,'test)
chaitin_test.printSchema()
tableEnv.executeSql("insert into chaitin_test select test from " + chaitin_test)

转发:Sql-client的checkpoint问题

2020-08-06 文章 king


Checkpoint只生成了shared和taskowned目录,没有chk,望解答,谢谢
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

- 转发的邮件 -

发件人: king
发送日期: 2020年08月07日 09:05
收件人: user-zh
主题: 转发:Sql-client的checkpoint问题


抱歉,不是flink-site.yaml是flink-conf.yaml
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

- 转发的邮件 -

发件人: king
发送日期: 2020年08月07日 08:23
收件人: user-zh
主题: Sql-client的checkpoint问题


您好,flink1.11.0,请问,
1.sql-client 如何设置checkpoint时间(生成周期),在做file 
streaming时候hdfs文件一直In-progress处状态,不能Finalized
2.之前在flink-site.yaml文件中设置了checkpoint,systemfile类型,只生成了另外两个目录,没有生成chk,望解答,谢谢


以上问题在编程方式下无问题。
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

转发:Sql-client的checkpoint问题

2020-08-06 文章 king


抱歉,不是flink-site.yaml是flink-conf.yaml
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

- 转发的邮件 -

发件人: king
发送日期: 2020年08月07日 08:23
收件人: user-zh
主题: Sql-client的checkpoint问题


您好,flink1.11.0,请问,
1.sql-client 如何设置checkpoint时间(生成周期),在做file 
streaming时候hdfs文件一直In-progress处状态,不能Finalized
2.之前在flink-site.yaml文件中设置了checkpoint,systemfile类型,只生成了另外两个目录,没有生成chk,望解答,谢谢


以上问题在编程方式下无问题。
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

回复:转发:flink1.10整合hbase测试遇到的问题

2020-06-03 文章 Lijie Wang
这个是因为这个 class 不在路径中导致的。你需要确认一下这个 class 在哪个 jar 包中,这个 jar 包是否在 flink 的 lib 下




在2020年06月3日 22:52,liunaihua521 写道:





- 转发邮件信息 -

发件人: liunaihua521 
发送日期: 2020年6月3日 22:18
发送至: user-zh-i...@flink.apache.org  、 
user-zh-...@flink.apache.org 
主题: flink1.10整合hbase测试遇到的问题
hi!
版本说明:
flink版本1.10
HBase版本2.2.4
ZK版本3.6.1
Hadoop版本2.10.0


程序说明:

程序是简单的实现RichSourceFunction和RichSinkFunction,读取和写入hbase,程序打包后上传standalone模式的集群.


报错说明:
提交任务后,总是报如下错误(附件附文本):

或者



尝试如下:
尝试一:
flink的lib下有如下jar包:
提交的jar包中发现没有下面连个类
执行后报错


尝试二:
将guava-11.0.2.jar包移动到hadoop的lib下,再次执行依然报错


尝试结果:
反复尝试都一致报错,求大神们指点,再此先谢谢了!


回复:转发:flink1.10整合hbase测试遇到的问题

2020-06-03 文章 1048262223
Hi


java.lang.Thread.run(Thread.java:748) Caused by: 
java.lang.ClassNotFoundException: 
org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader
根据报错看,这个guava的cacheloader应该是被shade到hbase的jar包中的,你可以查看下hbase相关的jar包有没有shade进去guava的这些类。
Best,Yichao Yang



发自我的iPhone


-- 原始邮件 --
发件人: liunaihua521 

转发:Re: sql关键字问题

2020-03-18 文章 lucas.wu
原始邮件
发件人:lucas.wulucas...@xiaoying.com
收件人:imj...@gmail.com
发送时间:2020年3月18日(周三) 17:21
主题:Re: sql关键字问题


Hi,jark
看到了你修复的这个jirahttps://issues.apache.org/jira/browse/FLINK-16526
但是看了你的代码和描述,你只是针对SqlBasicCall这种node的字段名加了`` 
,也就是说只会对有computed_column_expression的字段加上,但是对于普通的字段并没有覆盖到,请问我理解的正确吗?


原始邮件
发件人:Kurt youngykt...@gmail.com
收件人:user-zhuser...@flink.apache.org
抄送:Yuzhao chenyuzhao@gmail.com
发送时间:2020年3月18日(周三) 16:41
主题:Re: sql关键字问题


好像已经有了,应该是这个jira: https://issues.apache.org/jira/browse/FLINK-16526 Best, Kurt 
On Wed, Mar 18, 2020 at 4:19 PM Jingsong Li jingsongl...@gmail.com wrote:  Hi 
lucas,   赞专业的分析,看起来是Flink的bug,你可以建个Jira来跟踪。  CC: @Yuzhao Chen 
yuzhao@gmail.com   Best,  Jingsong Lee   On Wed, Mar 18, 2020 at 4:15 PM 
lucas.wu lucas...@xiaoying.com wrote:初步找到了原因   
原来我的建表语句用了computed_column_expression 这种语义。   然后flink内部在使用的时候其实是把它转成了select 语句   
...   if (columnExprs.nonEmpty) {   val fieldExprs = fieldNames   .map { name = 
  if (columnExprs.contains(name)) {   columnExprs(name)   } else {   name   }   
}.toArray   val rexNodes =   
toRexFactory.create(newRelTable.getRowType).convertToRexNodes(fieldExprs)   ….. 
  然后我们看看convertToRexNodes方法   public RexNode[] 
convertToRexNodes(String[] exprs) {   ….   String query = 
String.format(QUERY_FORMAT, String.join(",", exprs));   SqlNode parsed = 
planner.parser().parse(query);   }   重点就在这个QUERY_FORMAT   private static 
final String QUERY_FORMAT = "SELECT %s FROM " +   TEMPORARY_TABLE_NAME;   
这样写是有问题的,当我的字段本身是有``的时候,就被去掉了,导致后面valid的时候就报错。   所以这个是算flink的bug吗?   原始邮件   
发件人:lucas.wulucas...@xiaoying.com   收件人:user-zhuser...@flink.apache.org   
发送时间:2020年3月18日(周三) 15:36   主题:sql关键字问题   create table `source_table`( 
`SeqNo` varchar, `Type` varchar, `Table`   varchar, `ServerId` varchar, 
`Database` varchar, `OldData` varchar,  `GTID`   varchar, `Offset` varchar, 
`event_ts` as   
to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),   
WATERMARK FOR event_ts AS event_ts - interval '60' second ) with(…) 查询语句   
Select * from source_table; 这是我的建表和查询语句,发现即使用了`` 查询的时候还是会报Table是关键字的问题。  SQL   
parse failed. Encountered "Table" at line 1,column 19. 但是一旦我把 `event_ts`  as   
to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),   
WATERMARK FOR event_ts AS event_ts - interval '60’ second 这两行去掉   
,就正常了。是我的使用方法有问题吗? --  Best, Jingsong Lee

转发:(无主题)

2020-03-18 文章 酷酷的浑蛋




| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制

- 转发邮件信息 -

发件人: 酷酷的浑蛋 
发送日期: 2020年3月18日 15:15
发送至: user-zh 


现在我发现个问题:flink sql实时 inner join ,结果会发生乱序,请问这是正常的吗
| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制

转发:(无主题)

2020-03-13 文章 酷酷的浑蛋




| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制

- 转发邮件信息 -

发件人: 酷酷的浑蛋 
发送日期: 2020年3月13日 19:18
发送至: user-zh@flink.apache.org 
select a.x01,
   udf_name(a.x02)
from (select a.x01,
 a...
  from tb_name) a
  join
  (select * from tb_name) b
  on a.xx = b.xx
这是一段flink实时sql,其中总是偶尔有几条数据乱序,请问大佬们遇到过吗?或者该怎么解决?
| |
王太阳
|
|
apach...@163.com
|
签名由网易邮箱大师定制