Re: 自定义partition,使用遇到问题,附代码

2021-02-21 文章
Hi!

Optional.of(new customPartitioner())



Ye Chen wrote
> 各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。
> //自定义partition
> public class customPartitioner extends FlinkKafkaPartitioner
> 
>  {
> @Override
> public int partition(String record, byte[] key, byte[] value, String
> targetTopic, int[] partitions) {
> return 0;
> }
> }
> 
> 
> DataStream
> 
>  stream = 。。。
> FlinkKafkaProducer
> 
>  myProducer = new FlinkKafkaProducer<>(
> "test_topic",
> new SimpleStringSchema(),
> properties,
> new customPartitioner()
> );
> stream.addSink(myProducer);
> 
> 
> //上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java:
> 无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】
> //去掉new
> customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数
> 
> 
> 
> 
> 查看FlinkKafkaProducer源码如下,我上面的写法有问题么?
> public FlinkKafkaProducer(
> String topicId,
> SerializationSchema
> 
>  serializationSchema,
> Properties producerConfig,
> OptionalFlinkKafkaPartitionerIN> customPartitioner) {
> this(
> topicId,
> serializationSchema,
> producerConfig,
> customPartitioner.orElse(null),
> Semantic.AT_LEAST_ONCE,
> DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
> }





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink job与自己系统平台的一体化集成

2021-02-02 文章
Hi

可以分解为两步:
1、生成JobGraph,可以参考org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils
中 toJobGraph()
2、向Yarn提交JobGraph,可以参考org.apache.flink.yarn.YarnClusterDescriptor 中
deployJobCluster()
注:1.11.x


Jacob wrote
> 有一个模糊的需求,不知道是否合理
> 
> 目前我们的实时计算的Job都是以On Yarn模式运行在hadoop集群,每次提交新的job,都是在Flink客户端下面,用./bin/flink
> run-application -t yarn-application ... 的形式去提交Job。
> 
> 现在我们有自研的一个关于数据处理平台,flink
> job是数据处理的一个环节,想着能不能在我们系统的portal中配一个菜单,上传flink项目的jar包,可以提交Job到hadoop集群,形成一体化的管理,不用每次去一个flink客户端下面去提交了,不知道这种需求是否合理?
> 
> 我想着如果在我们自己的平台上提交job,那是不是应该先把flink客户端先集成到我们的系统中呢,否则job如何被启动运行呢?
> 
> 需求比较模糊,各位大佬见谅。
> 
> 
> 
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: crontab通过脚本启动flink-job失败,flink-sql-parquet_2.11-1.12.0.jar does not exist

2021-01-03 文章
hi!

java.io.FileNotFoundException: File file:/home/xjia/.flink/...
可以看出,从本地加载jar包,而不是hdfs。

我觉得可能是hadoop环境的问题,导致读取的scheme是file,使用 echo $HADOOP_CLASSPATH 检查你的环境。

Important Make sure that the HADOOP_CLASSPATH environment variable is set up
(it can be checked by running echo $HADOOP_CLASSPATH). If not, set it up
using

https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html

  



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink.client.program.ProgramInvocationException: The main method caused an error: Unable to instantiate java compiler

2020-12-24 文章
hi!

可以试试修改配置文件:
classloader.resolve-order: parent-first

或者可以尝试
org.apache.flink.table.runtime.generated.CompileUtils
这个工具类



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: pyflink1.12 使用connector read.query参数报错

2020-12-24 文章
hi! 试试这个

CREATE TABLE source_table(
yldrate DECIMAL,
pf_id VARCHAR,
symbol_id VARCHAR) WITH(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://ip/db',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'xxx',
'password' = 'xxx',
'table-name' = 'TS_PF_SEC_YLDRATE',
'read.query' = 'SELECT YLDRATE, PF_ID, SYMBOL_ID FROM
TS_PF_SEC_YLDRATE LEFT JOIN TP_GL_DAY ON DAY_ID = BIZ_DATE WHERE CCY_TYPE =
"AC" AND PF_ID = "1030100122" AND SYMBOL_ID = "2030004042" AND BIZ_DATE
between "20160701" AND "20170307"'
)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink-1.11.1流写filesystem分区提交问题

2020-12-24 文章
有开启checkpoint吗?

Part files can be in one of three states:

In-progress : The part file that is currently being written to is
in-progress
Pending : Closed (due to the specified rolling policy) in-progress files
that are waiting to be committed
Finished : On successful checkpoints (STREAMING) or at the end of input
(BATCH) pending files transition to “Finished”

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html

  



--
Sent from: http://apache-flink.147419.n8.nabble.com/