Re: StreamingFileSink Not Flushing All Data

2020-03-02 Thread Rafi Aroch
Hi,

This happens because StreamingFileSink does not support a finite input
stream.
In the docs it's mentioned under "Important Considerations":

[image: image.png]

This behaviour often surprises users...

There's a FLIP

and
an issue  about fixing
this. I'm not sure what's the status though, maybe Kostas can share.

Thanks,
Rafi


On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi Dawid and Kostas,
>
> Sorry for the late reply + thank you for the troubleshooting. I put
> together an example repo that reproduces the issue[1], because I did have
> checkpointing enabled in my previous case -- still must be doing something
> wrong with that config though.
>
> Thanks!
> Austin
>
> [1]: https://github.com/austince/flink-streaming-file-sink-compression
>
>
> On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas 
> wrote:
>
>> Hi Austin,
>>
>> Dawid is correct in that you need to enable checkpointing for the
>> StreamingFileSink to work.
>>
>> I hope this solves the problem,
>> Kostas
>>
>> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
>>  wrote:
>> >
>> > Hi Austing,
>> >
>> > If I am not mistaken the StreamingFileSink by default flushes on
>> checkpoints. If you don't have checkpoints enabled it might happen that not
>> all data is flushed.
>> >
>> > I think you can also adjust that behavior with:
>> >
>> > forBulkFormat(...)
>> >
>> > .withRollingPolicy(/* your custom logic */)
>> >
>> > I also cc Kostas who should be able to correct me if I am wrong.
>> >
>> > Best,
>> >
>> > Dawid
>> >
>> > On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
>> >
>> > Hi there,
>> >
>> > Using Flink 1.9.1, trying to write .tgz files with the
>> StreamingFileSink#BulkWriter. It seems like flushing the output stream
>> doesn't flush all the data written. I've verified I can create valid files
>> using the same APIs and data on there own, so thinking it must be something
>> I'm doing wrong with the bulk format. I'm writing to the local filesystem,
>> with the `file://` protocol.
>> >
>> > For Tar/ Gzipping, I'm using the Apache Commons Compression library,
>> version 1.20.
>> >
>> > Here's a runnable example of the issue:
>> >
>> > import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
>> > import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
>> > import
>> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
>> > import org.apache.flink.api.common.serialization.BulkWriter;
>> > import org.apache.flink.core.fs.FSDataOutputStream;
>> > import org.apache.flink.core.fs.Path;
>> > import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> > import
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>> >
>> > import java.io.FileOutputStream;
>> > import java.io.IOException;
>> > import java.io.Serializable;
>> > import java.nio.charset.StandardCharsets;
>> >
>> > class Scratch {
>> >   public static class Record implements Serializable {
>> > private static final long serialVersionUID = 1L;
>> >
>> > String id;
>> >
>> > public Record() {}
>> >
>> > public Record(String id) {
>> >   this.id = id;
>> > }
>> >
>> > public String getId() {
>> >   return id;
>> > }
>> >
>> > public void setId(String id) {
>> >   this.id = id;
>> > }
>> >   }
>> >
>> >   public static void main(String[] args) throws Exception {
>> > final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >
>> > TarArchiveOutputStream taos = new TarArchiveOutputStream(new
>> GzipCompressorOutputStream(new
>> FileOutputStream("/home/austin/Downloads/test.tgz")));
>> > TarArchiveEntry fileEntry = new
>> TarArchiveEntry(String.format("%s.txt", "test"));
>> > String fullText = "hey\nyou\nwork";
>> > byte[] fullTextData = fullText.getBytes();
>> > fileEntry.setSize(fullTextData.length);
>> > taos.putArchiveEntry(fileEntry);
>> > taos.write(fullTextData, 0, fullTextData.length);
>> > taos.closeArchiveEntry();
>> > taos.flush();
>> > taos.close();
>> >
>> > StreamingFileSink textSink = StreamingFileSink
>> > .forBulkFormat(new
>> Path("file:///home/austin/Downloads/text-output"),
>> > new BulkWriter.Factory() {
>> >   @Override
>> >   public BulkWriter create(FSDataOutputStream out)
>> throws IOException {
>> > final TarArchiveOutputStream compressedOutputStream =
>> new TarArchiveOutputStream(new GzipCompressorOutputStream(out));
>> >
>> > return new BulkWriter() {
>> >   @Override
>> >   public void addElement(Record record) throws
>> IOException {
>> > TarArchiveEntry fileEntry = new
>> TarArchiveEntry(String.format("%s.txt", 

Re: CliFrontend 未优先加载用户jar包中的class

2020-03-02 Thread tison
https://github.com/apache/flink/commit/0f30c263eebd2fc3ecbeae69a4ce9477e1d5d774

Best,
tison.


tison  于2020年3月3日周二 下午2:13写道:

> 1.9.2 和 1.10 上已经修复此问题,修改可参考
>
> https://issues.apache.org/jira/browse/FLINK-13749
>
> Best,
> tison.
>
>
> aven.wu  于2020年3月3日周二 下午2:04写道:
>
>> 组件版本 Hadoop 2.7.3,flink 1.9.1 ,elasticsearch6.5。
>> 该问题的起源是因为程序我的用户程序用Jackson,并依赖了Elasticsearch rest client
>> ,在Yarn集群上提交任务的时候出现了如下异常:
>> java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW
>> at
>> org.elasticsearch.common.xcontent.json.JsonXContent.(JsonXContent.java:57)
>> 后上网查询后推论有可能是jackson版本问题,于是打印了类加载路径:
>> --main class jackson class load before
>> run--
>> file:/usr/**/hadoop/lib/jackson-databind-2.2.3.jar
>> 果然是从hadoop的classpath下加载了2.2.3版本
>>
>> 之后查看flink run命令入口程序
>> CliFrontend#bulidProgram line 799
>> PackagedProgram#PackagedProgram line 221
>> JobWithJars#BuildUserCodeClassLoad line 142
>> return FlinkUserCodeClassLoaders.parentFirst(urls, parent);
>> 默认使用parentFirst,根据官方文档里面描述的反向类加载,应该是首先从classpath下加载Class,而不是从user
>> jar包中加载类。
>> 请问如何修改此处的类加载顺序,优先从user jar 中加载class
>>
>> Best
>> Aven
>>
>>


Re: CliFrontend 未优先加载用户jar包中的class

2020-03-02 Thread tison
1.9.2 和 1.10 上已经修复此问题,修改可参考

https://issues.apache.org/jira/browse/FLINK-13749

Best,
tison.


aven.wu  于2020年3月3日周二 下午2:04写道:

> 组件版本 Hadoop 2.7.3,flink 1.9.1 ,elasticsearch6.5。
> 该问题的起源是因为程序我的用户程序用Jackson,并依赖了Elasticsearch rest client
> ,在Yarn集群上提交任务的时候出现了如下异常:
> java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW
> at
> org.elasticsearch.common.xcontent.json.JsonXContent.(JsonXContent.java:57)
> 后上网查询后推论有可能是jackson版本问题,于是打印了类加载路径:
> --main class jackson class load before
> run--
> file:/usr/**/hadoop/lib/jackson-databind-2.2.3.jar
> 果然是从hadoop的classpath下加载了2.2.3版本
>
> 之后查看flink run命令入口程序
> CliFrontend#bulidProgram line 799
> PackagedProgram#PackagedProgram line 221
> JobWithJars#BuildUserCodeClassLoad line 142
> return FlinkUserCodeClassLoaders.parentFirst(urls, parent);
> 默认使用parentFirst,根据官方文档里面描述的反向类加载,应该是首先从classpath下加载Class,而不是从user jar包中加载类。
> 请问如何修改此处的类加载顺序,优先从user jar 中加载class
>
> Best
> Aven
>
>


CliFrontend 未优先加载用户jar包中的class

2020-03-02 Thread aven . wu
组件版本 Hadoop 2.7.3,flink 1.9.1 ,elasticsearch6.5。
该问题的起源是因为程序我的用户程序用Jackson,并依赖了Elasticsearch rest client ,在Yarn集群上提交任务的时候出现了如下异常:
java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW
at 
org.elasticsearch.common.xcontent.json.JsonXContent.(JsonXContent.java:57)
后上网查询后推论有可能是jackson版本问题,于是打印了类加载路径:
--main class jackson class load before 
run--
file:/usr/**/hadoop/lib/jackson-databind-2.2.3.jar
果然是从hadoop的classpath下加载了2.2.3版本

之后查看flink run命令入口程序
CliFrontend#bulidProgram line 799
PackagedProgram#PackagedProgram line 221
JobWithJars#BuildUserCodeClassLoad line 142 
return FlinkUserCodeClassLoaders.parentFirst(urls, parent);
默认使用parentFirst,根据官方文档里面描述的反向类加载,应该是首先从classpath下加载Class,而不是从user jar包中加载类。
请问如何修改此处的类加载顺序,优先从user jar 中加载class

Best
Aven



回复: Table API connect method timestamp watermark assignment problem

2020-03-02 Thread Lu Weizheng
Thanks a lot, hope it will be fixed soon!

发件人: Jark Wu 
发送时间: 2020年3月3日 11:25
收件人: Lu Weizheng 
抄送: user@flink.apache.org 
主题: Re: Table API connect method timestamp watermark assignment problem

Hi Lu,

DDL and Schema descriptor do not share the same code path. I guess the reason 
why Schema descriptor doesn't work is because of FLINK-16160.
We will fix that in the next minor release. Please use DDL to define watermark 
which is also the suggested way to do that.
The current Schema descriptor will be refactored to share the same code path of 
DDL in the near future.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-16160

On Tue, 3 Mar 2020 at 10:09, Lu Weizheng 
mailto:luweizhen...@hotmail.com>> wrote:
Hey guys,

I am using Flink Table API recently. I want to use EventTime and use a Kakfa 
Table Connector. I think in my code Flink cannot recognize event time timestamp 
field. Here is my code :

public static void main(String[] args) throws Exception {

EnvironmentSettings fsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
fsSettings);

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

tEnv
// 使用connect函数连接外部系统
.connect(
new Kafka()
.version("universal") // 必填,合法的参数有"0.8", "0.9", "0.10", 
"0.11"或"universal"
.topic("user_behavior")   // 必填,Topic名
.startFromLatest()// 首次消费时数据读取的位置
.property("zookeeper.connect", "localhost:2181")  // Kafka连接参数
.property("bootstrap.servers", "localhost:9092")
)
// 序列化方式 可以是JSON、Avro等
.withFormat(new Json())
// 数据的Schema
.withSchema(
new Schema()
.field("user_id", DataTypes.BIGINT())
.field("item_id", DataTypes.BIGINT())
.field("category_id", DataTypes.BIGINT())
.field("behavior", DataTypes.STRING())
.field("ts", DataTypes.TIMESTAMP(3))
.rowtime(new 
Rowtime().timestampsFromField("ts").watermarksPeriodicAscending())
)
// 临时表的表名,后续可以在SQL语句中使用这个表名
.createTemporaryTable("user_behavior");

Table tumbleGroupByUserId = tEnv.sqlQuery("SELECT \n" +
"\tuser_id, \n" +
"\tCOUNT(behavior) AS behavior_cnt, \n" +
"\tTUMBLE_END(ts, INTERVAL '10' SECOND) AS end_ts \n" +
"FROM user_behavior\n" +
"GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)");
DataStream> result = 
tEnv.toRetractStream(tumbleGroupByUserId, Row.class);
result.print();

env.execute("table api");
}

As shown in the code above, I use rowtime() method when I want to define a 
Schema. When I try to run, I get the following error: Window aggregate can only 
be defined over a time attribute column, but TIMESTAMP(3) encountered.

​I tried another method based on a DLL, and it worked. So it is not my Kafka 
source problem.

tEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
"user_id BIGINT,\n" +
"item_id BIGINT,\n" +
"category_id BIGINT,\n" +
"behavior STRING,\n" +
"ts TIMESTAMP(3),\n" +
//"proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
"WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 
在ts上定义watermark,ts成为事件时间列\n" +
") WITH (\n" +
"'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
"'connector.version' = 'universal',  -- kafka 版本,universal 
支持 0.11 以上的版本\n" +
"'connector.topic' = 'user_behavior',  -- kafka topic\n" +
"'connector.startup-mode' = 'latest-offset',  -- 从起始 offset 
开始读取\n" +
"'connector.properties.zookeeper.connect' = 
'localhost:2181',  -- zookeeper 地址\n" +
"'connector.properties.bootstrap.servers' = 
'localhost:9092',  -- kafka broker 地址\n" +
"'format.type' = 'json'  -- 数据源格式为 json\n" +
")");

Hope anyone can give some suggestions. Thanks.


Re: Table API connect method timestamp watermark assignment problem

2020-03-02 Thread Jark Wu
Hi Lu,

DDL and Schema descriptor do not share the same code path. I guess the
reason why Schema descriptor doesn't work is because of FLINK-16160.
We will fix that in the next minor release. Please use DDL to define
watermark which is also the suggested way to do that.
The current Schema descriptor will be refactored to share the same code
path of DDL in the near future.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-16160

On Tue, 3 Mar 2020 at 10:09, Lu Weizheng  wrote:

> Hey guys,
>
> I am using Flink Table API recently. I want to use EventTime and use a
> Kakfa Table Connector. I think in my code Flink cannot recognize event time
> timestamp field. Here is my code :
>
> public static void main(String[] args) throws Exception {
>
> EnvironmentSettings fsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> fsSettings);
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> tEnv
> // 使用connect函数连接外部系统
> .connect(
> new Kafka()
> .version("universal") // 必填,合法的参数有"0.8", "0.9",
> "0.10", "0.11"或"universal"
> .topic("user_behavior")   // 必填,Topic名
> .startFromLatest()// 首次消费时数据读取的位置
> .property("zookeeper.connect", "localhost:2181")  //
> Kafka连接参数
> .property("bootstrap.servers", "localhost:9092")
> )
> // 序列化方式 可以是JSON、Avro等
> .withFormat(new Json())
> // 数据的Schema
> .withSchema(
> new Schema()
> .field("user_id", DataTypes.BIGINT())
> .field("item_id", DataTypes.BIGINT())
> .field("category_id", DataTypes.BIGINT())
> .field("behavior", DataTypes.STRING())
> .field("ts", DataTypes.TIMESTAMP(3))
> .rowtime(new
> Rowtime().timestampsFromField("ts").watermarksPeriodicAscending())
> )
> // 临时表的表名,后续可以在SQL语句中使用这个表名
> .createTemporaryTable("user_behavior");
>
> Table tumbleGroupByUserId = tEnv.sqlQuery("SELECT \n" +
> "\tuser_id, \n" +
> "\tCOUNT(behavior) AS behavior_cnt, \n" +
> "\tTUMBLE_END(ts, INTERVAL '10' SECOND) AS end_ts \n" +
> "FROM user_behavior\n" +
> "GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)");
> DataStream> result =
> tEnv.toRetractStream(tumbleGroupByUserId, Row.class);
> result.print();
>
> env.execute("table api");
> }
>
> As shown in the code above, I use rowtime() method when I want to define a
> Schema. When I try to run, I get the following error: Window aggregate
> can only be defined over a time attribute column, but TIMESTAMP(3)
> encountered.
>
> ​I tried another method based on a DLL, and it worked. So it is not my
> Kafka source problem.
>
> tEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
> "user_id BIGINT,\n" +
> "item_id BIGINT,\n" +
> "category_id BIGINT,\n" +
> "behavior STRING,\n" +
> "ts TIMESTAMP(3),\n" +
> //"proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
> "WATERMARK FOR ts as ts - INTERVAL '5' SECOND  --
> 在ts上定义watermark,ts成为事件时间列\n" +
> ") WITH (\n" +
> "'connector.type' = 'kafka',  -- 使用 kafka connector\n"
> +
> "'connector.version' = 'universal',  -- kafka
> 版本,universal 支持 0.11 以上的版本\n" +
> "'connector.topic' = 'user_behavior',  -- kafka
> topic\n" +
> "'connector.startup-mode' = 'latest-offset',  -- 从起始
> offset 开始读取\n" +
> "'connector.properties.zookeeper.connect' =
> 'localhost:2181',  -- zookeeper 地址\n" +
> "'connector.properties.bootstrap.servers' =
> 'localhost:9092',  -- kafka broker 地址\n" +
> "'format.type' = 'json'  -- 数据源格式为 json\n" +
> ")");
>
> Hope anyone can give some suggestions. Thanks.
>


Use flink to calculate sum of the inventory under certain conditions

2020-03-02 Thread Jiawei Wu
Hi flink users,

We have a problem and think flink may be a good solution for that. But I'm
new to flink and hope can get some insights from flink community :)

Here is the problem. Suppose we have a DynamoDB table which store the
inventory data, the schema is like:

* vendorId (primary key)
* inventory name
* inventory units
* inbound time
...

This DDB table keeps changing, since we have inventory coming and
removal. *Every
change will trigger a DynamoDB stream. *
We need to calculate *all the inventory units that > 15 days for a specific
vendor* like this:
> select vendorId, sum(inventory units)
> from dynamodb
> where today's time - inbound time > 15
> group by vendorId
We don't want to schedule a daily batch job, so we are trying to work on a
micro-batch solution in Flink, and publish this data to another DynamoDB
table.

A draft idea is to use the total units minus <15 days units, since both of
then have event trigger. But no detailed solutions yet.

Could anyone help provide some insights here?

Thanks,
J.


Table API connect method timestamp watermark assignment problem

2020-03-02 Thread Lu Weizheng
Hey guys,

I am using Flink Table API recently. I want to use EventTime and use a Kakfa 
Table Connector. I think in my code Flink cannot recognize event time timestamp 
field. Here is my code :

public static void main(String[] args) throws Exception {

EnvironmentSettings fsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
fsSettings);

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

tEnv
// 使用connect函数连接外部系统
.connect(
new Kafka()
.version("universal") // 必填,合法的参数有"0.8", "0.9", "0.10", 
"0.11"或"universal"
.topic("user_behavior")   // 必填,Topic名
.startFromLatest()// 首次消费时数据读取的位置
.property("zookeeper.connect", "localhost:2181")  // Kafka连接参数
.property("bootstrap.servers", "localhost:9092")
)
// 序列化方式 可以是JSON、Avro等
.withFormat(new Json())
// 数据的Schema
.withSchema(
new Schema()
.field("user_id", DataTypes.BIGINT())
.field("item_id", DataTypes.BIGINT())
.field("category_id", DataTypes.BIGINT())
.field("behavior", DataTypes.STRING())
.field("ts", DataTypes.TIMESTAMP(3))
.rowtime(new 
Rowtime().timestampsFromField("ts").watermarksPeriodicAscending())
)
// 临时表的表名,后续可以在SQL语句中使用这个表名
.createTemporaryTable("user_behavior");

Table tumbleGroupByUserId = tEnv.sqlQuery("SELECT \n" +
"\tuser_id, \n" +
"\tCOUNT(behavior) AS behavior_cnt, \n" +
"\tTUMBLE_END(ts, INTERVAL '10' SECOND) AS end_ts \n" +
"FROM user_behavior\n" +
"GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)");
DataStream> result = 
tEnv.toRetractStream(tumbleGroupByUserId, Row.class);
result.print();

env.execute("table api");
}

As shown in the code above, I use rowtime() method when I want to define a 
Schema. When I try to run, I get the following error: Window aggregate can only 
be defined over a time attribute column, but TIMESTAMP(3) encountered.

​I tried another method based on a DLL, and it worked. So it is not my Kafka 
source problem.

tEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
"user_id BIGINT,\n" +
"item_id BIGINT,\n" +
"category_id BIGINT,\n" +
"behavior STRING,\n" +
"ts TIMESTAMP(3),\n" +
//"proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
"WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 
在ts上定义watermark,ts成为事件时间列\n" +
") WITH (\n" +
"'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
"'connector.version' = 'universal',  -- kafka 版本,universal 
支持 0.11 以上的版本\n" +
"'connector.topic' = 'user_behavior',  -- kafka topic\n" +
"'connector.startup-mode' = 'latest-offset',  -- 从起始 offset 
开始读取\n" +
"'connector.properties.zookeeper.connect' = 
'localhost:2181',  -- zookeeper 地址\n" +
"'connector.properties.bootstrap.servers' = 
'localhost:9092',  -- kafka broker 地址\n" +
"'format.type' = 'json'  -- 数据源格式为 json\n" +
")");

Hope anyone can give some suggestions. Thanks.


Re: SHOW CREATE TABLE in Flink SQL

2020-03-02 Thread JingsongLee
Hi,

Some previous discussion in [1], FYI

[1] https://issues.apache.org/jira/browse/FLINK-10230

Best,
Jingsong Lee


--
From:Jark Wu 
Send Time:2020年3月2日(星期一) 22:42
To:Jeff Zhang 
Cc:"Gyula Fóra" ; user 
Subject:Re: SHOW CREATE TABLE in Flink SQL

big +1 for this. I created an issue for "SHOW CREATE TABLE" [1]. Many database 
systems also support this.
We can also introduce "describe extended table" in the future but is an 
orthogonal requirement. 

Best,
Jark


[1]: https://issues.apache.org/jira/browse/FLINK-16384
On Mon, 2 Mar 2020 at 22:09, Jeff Zhang  wrote:

+1 for this, maybe we can add 'describe extended table' like hive
Gyula Fóra  于2020年3月2日周一 下午8:49写道:
Hi All!

I am looking for the functionality to show how a table was created or show all 
the properties (connector, etc.)

I could only find DESCRIBE at this point which only shows the schema.

Is there anything similar to "SHOW CREATE TABLE" or is this something that we 
should maybe add in the future?

Thank you!
Gyula 

-- 
Best Regards

Jeff Zhang  

Re: java.time.LocalDateTime in POJO type

2020-03-02 Thread JingsongLee
Hi,

I'v introduced LocalDateTime type information to flink-core.
But for compatibility reason, I revert the modification in TypeExtractor.
It seems that at present you can only use Types.LOCAL_DATE_TIME explicitly.

[1] http://jira.apache.org/jira/browse/FLINK-12850

Best,
Jingsong Lee


--
From:KristoffSC 
Send Time:2020年3月3日(星期二) 03:47
To:user 
Subject:Re: java.time.LocalDateTime in POJO type

Hi Tzu-Li,
I think you misunderstood Oskar's question. 
The question was if there are there any plans to support Java's
LocalDateTime in Flink's "native" de/serialization mechanism. As we can read
in [1], for basic types, Flink supports all Java primitives and their boxed
form, plus void, String, Date, BigDecimal, and BigInteger.

So we have Java Date, the question is, will there be a support for
LocalDateTime? 

Thanks,
Krzysztof

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#flinks-typeinformation-class



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Providing hdfs name node IP for streaming file sink

2020-03-02 Thread Yang Wang
It may work. However, you need to set your own retry policy(similar as
`ConfiguredFailoverProxyProvider` in hadoop).
Also if you directly use namenode address and do not load HDFS
configuration, some HDFS client configuration (e.g.
dfs.client.*) will not take effect.


Best,
Yang

Nick Bendtner  于2020年3月2日周一 下午11:58写道:

> Thanks a lot Yang. What are your thoughts on catching the exception when a
> name node is down and retrying with the secondary name node ?
>
> Best,
> Nick.
>
> On Sun, Mar 1, 2020 at 9:05 PM Yang Wang  wrote:
>
>> Hi Nick,
>>
>> Certainly you could directly use "namenode:port" as the schema of you
>> HDFS path.
>> Then the hadoop configs(e.g. core-site.xml, hdfs-site.xml) will not be
>> necessary.
>> However, that also means you could benefit from the HDFS
>> high-availability[1].
>>
>> If your HDFS cluster is HA configured, i strongly suggest you to set the
>> "HADOOP_CONF_DIR"
>> for your Flink application. Both the client and cluster(JM/TM) side need
>> to be set. Then
>> your HDFS path could be specified like this "hdfs://myhdfs/flink/test".
>> Given that "myhdfs"
>> is the name service configured in hdfs-site.xml.
>>
>>
>> Best,
>> Yang
>>
>>
>>
>> [1].
>> http://hadoop.apache.org/docs/r2.8.5/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html
>>
>> Nick Bendtner  于2020年2月29日周六 上午6:00写道:
>>
>>> To add to this question, do I need to setup env.hadoop.conf.dir to
>>> point to the hadoop config for instance env.hadoop.conf.dir=/etc/hadoop/
>>> for the jvm ? Or is it possible to write to hdfs without any external
>>> hadoop config like core-site.xml, hdfs-site.xml ?
>>>
>>> Best,
>>> Nick.
>>>
>>>
>>>
>>> On Fri, Feb 28, 2020 at 12:56 PM Nick Bendtner 
>>> wrote:
>>>
 Hi guys,
 I am trying to write to hdfs from streaming file sink. Where should I
 provide the IP address of the name node ? Can I provide it as a part of the
 flink-config.yaml file or should I provide it like this :

 final StreamingFileSink sink = StreamingFileSink
.forBulkFormat(hdfs://namenode:8020/flink/test, 
 ParquetAvroWriters.forGenericRecord(schema))

.build();


 Best,
 Nick





Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-02 Thread Robert Metzger
side note: this question has been asked on SO as well:
https://stackoverflow.com/questions/60487571/flink-eventtime-processing-watermark-is-always-coming-as-9223372036854725808
(I'm mentioning this here so that we are not wasting support resources in
our community on double-debugging issues)

On Mon, Mar 2, 2020 at 5:36 PM aj  wrote:

> Hi David,
>
> Currently, I am testing it with a single source and parallelism 1 only so
> not able to understand this behavior.
>
> On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz 
> wrote:
>
>> Hi Anuj,
>>
>> What parallelism has your source? Do all of your source tasks produce
>> records? Watermark is always the minimum of timestamps seen from all the
>> upstream operators. Therefore if some of them do not produce records the
>> watermark will not progress. You can read more about Watermarks and how
>> they work here:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams
>>
>> Hope that helps
>>
>> Best,
>>
>> Dawid
>> On 02/03/2020 16:26, aj wrote:
>>
>> I am trying to use process function to some processing on a set of
>> events. I am using event time and keystream. The issue I am facing is The
>> watermark value is always coming as 9223372036854725808. I have put print
>> statement to debug and it shows like this:
>>
>> timestamp--1583128014000 extractedTimestamp 1583128014000
>> currentwatermark-9223372036854775808
>>
>> timestamp--1583128048000 extractedTimestamp 1583128048000
>> currentwatermark-9223372036854775808
>>
>> timestamp--1583128089000 extractedTimestamp 1583128089000
>> currentwatermark-9223372036854775808
>>
>> timestamp and extracted timestamp changing but watermark not getting
>> updated. So no record is getting in the queue as context.timestamp is never
>> less than the watermark.
>>
>>
>> DataStream dataStream = 
>> env.addSource(searchConsumer).name("search_list_keyless");
>> DataStream dataStreamWithWaterMark =  
>> dataStream.assignTimestampsAndWatermarks(new SessionAssigner());
>>
>>try {
>> dataStreamWithWaterMark.keyBy((KeySelector> String>) record -> {
>> StringBuilder builder = new StringBuilder();
>> builder.append(record.get("session_id"));
>> builder.append(record.get("user_id"));
>> return builder.toString();
>> }).process(new MatchFunction()).print();
>> }
>> catch (Exception e){
>> e.printStackTrace();
>> }
>> env.execute("start session process");
>>
>> }
>>
>> public static class SessionAssigner implements 
>> AssignerWithPunctuatedWatermarks  {
>> @Override
>> public long extractTimestamp(GenericRecord record, long 
>> previousElementTimestamp) {
>> long timestamp = (long) record.get("event_ts");
>> System.out.println("timestamp--"+ timestamp);
>> return timestamp;
>> }
>>
>> @Override
>> public Watermark checkAndGetNextWatermark(GenericRecord record, long 
>> extractedTimestamp) {
>> // simply emit a watermark with every event
>> System.out.println("extractedTimestamp "+extractedTimestamp);
>> return new Watermark(extractedTimestamp - 3);
>> }
>>  }
>>
>>@Override
>> public void processElement(GenericRecord record, Context context, 
>> Collector collector) throws Exception {
>>
>> TimerService timerService = context.timerService();
>> System.out.println("currentwatermark"+ 
>> timerService.currentWatermark());
>> if (context.timestamp() > timerService.currentWatermark()) {
>>
>> Tuple2> queueval = 
>> queueState.value();
>> PriorityQueue queue = queueval.f1;
>> long startTime = queueval.f0;
>> System.out.println("starttime"+ startTime);
>>
>> if (queue == null) {
>> queue = new PriorityQueue<>(10, new TimeStampComprator());
>> startTime = (long) record.get("event_ts");
>> }
>> queueState.update(new Tuple2<>(startTime, queue));
>> timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
>> }
>> }
>>
>> }
>>
>> Please help me to underand what i am doing wrong.
>>
>>  --
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>>
>>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> 
>
>
> 
>


Re: [Kubernetes] java.lang.OutOfMemoryError: Metaspace ?

2020-03-02 Thread Niels Basjes
I've put some information about my situation in the ticket
https://issues.apache.org/jira/browse/FLINK-16142?focusedCommentId=17049679=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17049679

On Mon, Mar 2, 2020 at 2:55 PM Arvid Heise  wrote:

> Hi Niels,
>
> to add to Yang. 96m is plenty of space and was heavily tested by Alibaba.
>
> The most likely reason and the motivation for the change is that you
> probably have a classloader leak in your pipeline, quite possibly by one of
> our connectors. For example, see FLINK-16142 [1].
> If you could give us more details about your pipeline, we can try to ping
> it down.
>
> However, if the error already occurs during the first start on the
> cluster, then the limit is too narrow and increasing it will already help.
> In that case, please leave us a quick comment with your used dependencies,
> so that we could potentially increase the default setting for future
> releases.
>
> [1] https://issues.apache.org/jira/browse/FLINK-16142
>
> On Mon, Mar 2, 2020 at 1:27 PM Yang Wang  wrote:
>
>> From 1.10, Flink will enable the metaspace limit via
>> "-XX:MaxMetaspaceSize"
>> by default. The default value is 96m, loading too many classes will cause
>> "OutOfMemoryError: Metaspace"[1]. So you need to increase the configured
>> value.
>>
>>
>> [1].
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_trouble.html#outofmemoryerror-metaspace
>>
>>
>> Best,
>> Yang
>>
>> Niels Basjes  于2020年3月2日周一 下午7:16写道:
>>
>>> Hi,
>>>
>>> I'm running a lot of batch jobs on Kubernetes once in a while I get this
>>> exception.
>>> What is causing this?
>>> How can I fix this?
>>>
>>> Niels Basjes
>>>
>>> java.lang.OutOfMemoryError: Metaspace
>>> at java.lang.ClassLoader.defineClass1(Native Method)
>>> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>>> at java.security.SecureClassLoader.defineClass(SecureClassLoader
>>> .java:142)
>>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
>>> at org.apache.flink.util.ChildFirstClassLoader.loadClass(
>>> ChildFirstClassLoader.java:60)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>>> at java.lang.ClassLoader.defineClass1(Native Method)
>>> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>>> at java.security.SecureClassLoader.defineClass(SecureClassLoader
>>> .java:142)
>>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
>>> at org.apache.flink.util.ChildFirstClassLoader.loadClass(
>>> ChildFirstClassLoader.java:60)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>>> at org.apache.logging.log4j.LogManager.(LogManager.java:60)
>>> at org.elasticsearch.common.logging.ESLoggerFactory.getLogger(
>>> ESLoggerFactory.java:45)
>>> at org.elasticsearch.common.logging.ESLoggerFactory.getLogger(
>>> ESLoggerFactory.java:53)
>>> at org.elasticsearch.common.logging.Loggers.getLogger(Loggers.java:
>>> 104)
>>> at org.elasticsearch.common.unit.ByteSizeValue.(
>>> ByteSizeValue.java:39)
>>> at org.elasticsearch.action.bulk.BulkProcessor$Builder.(
>>> BulkProcessor.java:88)
>>> at org.elasticsearch.action.bulk.BulkProcessor$Builder.(
>>> BulkProcessor.java:80)
>>> at org.elasticsearch.action.bulk.BulkProcessor.builder(BulkProcessor
>>> .java:174)
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: java.time.LocalDateTime in POJO type

2020-03-02 Thread KristoffSC
Hi Tzu-Li,
I think you misunderstood Oskar's question. 
The question was if there are there any plans to support Java's
LocalDateTime in Flink's "native" de/serialization mechanism. As we can read
in [1], for basic types, Flink supports all Java primitives and their boxed
form, plus void, String, Date, BigDecimal, and BigInteger.

So we have Java Date, the question is, will there be a support for
LocalDateTime? 

Thanks,
Krzysztof

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#flinks-typeinformation-class



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: what is the hash function that Flink creates the UID?

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi,

Flink currently performs a 128-bit murmur hash on the user-provided uids to
generate the final node hashes in the stream graph. Specifically, this
library is being used [1] as the hash function.

If what you are looking for is for Flink to use exactly the provided hash,
you can use `setUidHash` for that - Flink will use that provided uid hash as
is for the generated node hashes.
However, that was exposed as a means for manual workarounds to allow for
backwards compatibility in legacy breaking cases, so it is not advised to
use that in your case.

BR,
Gordon

[1]
https://guava.dev/releases/19.0/api/docs/com/google/common/hash/Hashing.html#murmur3_128(int)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-02 Thread aj
Hi David,

Currently, I am testing it with a single source and parallelism 1 only so
not able to understand this behavior.

On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz 
wrote:

> Hi Anuj,
>
> What parallelism has your source? Do all of your source tasks produce
> records? Watermark is always the minimum of timestamps seen from all the
> upstream operators. Therefore if some of them do not produce records the
> watermark will not progress. You can read more about Watermarks and how
> they work here:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams
>
> Hope that helps
>
> Best,
>
> Dawid
> On 02/03/2020 16:26, aj wrote:
>
> I am trying to use process function to some processing on a set of events.
> I am using event time and keystream. The issue I am facing is The watermark
> value is always coming as 9223372036854725808. I have put print statement
> to debug and it shows like this:
>
> timestamp--1583128014000 extractedTimestamp 1583128014000
> currentwatermark-9223372036854775808
>
> timestamp--1583128048000 extractedTimestamp 1583128048000
> currentwatermark-9223372036854775808
>
> timestamp--1583128089000 extractedTimestamp 1583128089000
> currentwatermark-9223372036854775808
>
> timestamp and extracted timestamp changing but watermark not getting
> updated. So no record is getting in the queue as context.timestamp is never
> less than the watermark.
>
>
> DataStream dataStream = 
> env.addSource(searchConsumer).name("search_list_keyless");
> DataStream dataStreamWithWaterMark =  
> dataStream.assignTimestampsAndWatermarks(new SessionAssigner());
>
>try {
> dataStreamWithWaterMark.keyBy((KeySelector String>) record -> {
> StringBuilder builder = new StringBuilder();
> builder.append(record.get("session_id"));
> builder.append(record.get("user_id"));
> return builder.toString();
> }).process(new MatchFunction()).print();
> }
> catch (Exception e){
> e.printStackTrace();
> }
> env.execute("start session process");
>
> }
>
> public static class SessionAssigner implements 
> AssignerWithPunctuatedWatermarks  {
> @Override
> public long extractTimestamp(GenericRecord record, long 
> previousElementTimestamp) {
> long timestamp = (long) record.get("event_ts");
> System.out.println("timestamp--"+ timestamp);
> return timestamp;
> }
>
> @Override
> public Watermark checkAndGetNextWatermark(GenericRecord record, long 
> extractedTimestamp) {
> // simply emit a watermark with every event
> System.out.println("extractedTimestamp "+extractedTimestamp);
> return new Watermark(extractedTimestamp - 3);
> }
>  }
>
>@Override
> public void processElement(GenericRecord record, Context context, 
> Collector collector) throws Exception {
>
> TimerService timerService = context.timerService();
> System.out.println("currentwatermark"+ 
> timerService.currentWatermark());
> if (context.timestamp() > timerService.currentWatermark()) {
>
> Tuple2> queueval = 
> queueState.value();
> PriorityQueue queue = queueval.f1;
> long startTime = queueval.f0;
> System.out.println("starttime"+ startTime);
>
> if (queue == null) {
> queue = new PriorityQueue<>(10, new TimeStampComprator());
> startTime = (long) record.get("event_ts");
> }
> queueState.update(new Tuple2<>(startTime, queue));
> timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
> }
> }
>
> }
>
> Please help me to underand what i am doing wrong.
>
>  --
> Thanks & Regards,
> Anuj Jain
>
>
>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






Re: java.time.LocalDateTime in POJO type

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi,

What that LOG means (i.e. "must be processed as a Generic Type") is that
Flink will have to fallback to using Kryo for the serialization for that
type.

You should be concerned about that if:
1) That type is being used for some persisted state in snapshots. That would
be the case if you've registered state of that type, or is used as the input
for some built-in operator that persists input records in state (e.g. window
operators). Kryo generally does not have a friendly schema evolution story,
so you would want to avoid that going into production.
2) Kryo itself is not the fastest compared to Flink's POJO serializer, so
that would be something to consider as well even if the type is only used
for transient, on-wire data.

I think in your case, since your POJO contains an inner field that cannot be
recognized as a POJO (i.e. the LocalDateTime), then your outer class is also
not recognized as a POJO.

BR,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi Kaymak,

To answer your last question:
there will be no data loss in that scenario you described, but there could
be duplicate processed records.

With checkpointing enabled, the Flink Kafka consumer does not commit
offsets back to Kafka until offsets in Flink checkpoints have been
persisted.

That external offset commit, however, is not guaranteed to happen, and
always "lag" behind the offsets maintained internally in Flink checkpoints.
That is the reason for why there may be duplicate consumed records if you
rely on those on startup, instead of the offsets maintained within Flink.

The rule of thumb is:
Committed offsets back to Kafka by the Flink Kafka consumer is only a means
to expose progress to the outside world,
and there is no guarantee that those committed offsets are consistent with
operator states in the streaming job.

BR,
Gordon


On Mon, Mar 2, 2020, 11:18 PM Kaymak, Tobias 
wrote:

> Thank you! One last question regarding Gordons response. When a pipeline
> stops consuming and cleanly shuts down and there is no error during that
> process, and then it gets started again and uses the last committed offset
> in Kafka - there should be no data loss - or am I missing something?
>
> In what scenario should I expect a data loss? (I can only think of the
> jobmanager or taskmanager getting killed before the shutdown is done.)
>
> Best,
> Tobi
>
> On Mon, Mar 2, 2020 at 1:45 PM Piotr Nowojski  wrote:
>
>> Hi,
>>
>> Sorry for my previous slightly confusing response, please take a look at
>> the response from Gordon.
>>
>> Piotrek
>>
>> On 2 Mar 2020, at 12:05, Kaymak, Tobias  wrote:
>>
>> Hi,
>>
>> let me refine my question: My pipeline is generated from Beam, so the
>> Flink pipeline is a translated Beam pipeline. When I update my Apache Beam
>> pipeline code, working with a snapshot in Flink to stop the pipeline is not
>> an option, as the snapshot will use the old representation of the the Flink
>> pipeline when resuming from that snapshot.
>>
>> Meaning that I am looking for a way to drain the pipeline cleanly and
>> using the last committed offset in Kafka to resume processing after I
>> started it again (launching it through Beam will regenerate the Flink
>> pipeline and it should resume at the offset where it left of, that is the
>> latest committed offset in Kafka).
>>
>> Can this be achieved with a cancel or stop of the Flink pipeline?
>>
>> Best,
>> Tobias
>>
>> On Mon, Mar 2, 2020 at 11:09 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi Tobi,
>>>
>>> No, FlinkKafkaConsumer is not using committed Kafka’s offsets for
>>> recovery. Offsets where to start from are stored in the checkpoint itself.
>>> Updating the offsets back to Kafka is an optional, purely cosmetic thing
>>> from the Flink’s perspective, so the job will start from the correct
>>> offsets.
>>>
>>> However, if you for whatever the reason re-start the job from a
>>> savepoint/checkpoint that’s not the latest one, this will violate
>>> exactly-once guarantees - there will be some duplicated records committed
>>> two times in the sinks, as simply some records would be processed and
>>> committed twice. Committing happens on checkpoint, so if you are recovering
>>> to some previous checkpoint, there is nothing Flink can do - some records
>>> were already committed before.
>>>
>>> Piotrek
>>>
>>> On 2 Mar 2020, at 10:12, Kaymak, Tobias 
>>> wrote:
>>>
>>> Thank you Piotr!
>>>
>>> One last question - let's assume my source is a Kafka topic - if I stop
>>> via the CLI with a savepoint in Flink 1.9, but do not use that savepoint
>>> when restarting my job - the job would continue from the last offset that
>>> has been committed in Kafka and thus I would also not experience a loss of
>>> data in my sink. Is that correct?
>>>
>>> Best,
>>> Tobi
>>>
>>> On Fri, Feb 28, 2020 at 3:17 PM Piotr Nowojski 
>>> wrote:
>>>
 Yes, that’s correct. There shouldn’t be any data loss. Stop with
 savepoint is a solution to make sure, that if you are stopping a job
 (either permanently or temporarily) that all of the results are
 published/committed to external systems before you actually stop the job.

 If you just cancel/kill/crash a job, in some rare cases (if a
 checkpoint was completing at the time cluster was crashing), some records
 might not be committed before the cancellation/kill/crash happened. Also
 note that doesn’t mean there is a data loss, just those records will be
 published once you restore your job from a checkpoint. If you want to stop
 the job permanently, that might not happen, hence we need stop with
 savepoint.

 Piotrek

 On 28 Feb 2020, at 15:02, Kaymak, Tobias 
 wrote:

 Thank you! For understanding the matter: When I have a streaming
 pipeline (reading from Kafka, writing somewhere) and I click "cancel" and
 after that I restart the pipeline - I should not expect any data to be lost
 - is that correct?

 Best,
 Tobias

 On 

Re: Providing hdfs name node IP for streaming file sink

2020-03-02 Thread Nick Bendtner
Thanks a lot Yang. What are your thoughts on catching the exception when a
name node is down and retrying with the secondary name node ?

Best,
Nick.

On Sun, Mar 1, 2020 at 9:05 PM Yang Wang  wrote:

> Hi Nick,
>
> Certainly you could directly use "namenode:port" as the schema of you HDFS
> path.
> Then the hadoop configs(e.g. core-site.xml, hdfs-site.xml) will not be
> necessary.
> However, that also means you could benefit from the HDFS
> high-availability[1].
>
> If your HDFS cluster is HA configured, i strongly suggest you to set the
> "HADOOP_CONF_DIR"
> for your Flink application. Both the client and cluster(JM/TM) side need
> to be set. Then
> your HDFS path could be specified like this "hdfs://myhdfs/flink/test".
> Given that "myhdfs"
> is the name service configured in hdfs-site.xml.
>
>
> Best,
> Yang
>
>
>
> [1].
> http://hadoop.apache.org/docs/r2.8.5/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html
>
> Nick Bendtner  于2020年2月29日周六 上午6:00写道:
>
>> To add to this question, do I need to setup env.hadoop.conf.dir to point
>> to the hadoop config for instance env.hadoop.conf.dir=/etc/hadoop/ for
>> the jvm ? Or is it possible to write to hdfs without any external hadoop
>> config like core-site.xml, hdfs-site.xml ?
>>
>> Best,
>> Nick.
>>
>>
>>
>> On Fri, Feb 28, 2020 at 12:56 PM Nick Bendtner 
>> wrote:
>>
>>> Hi guys,
>>> I am trying to write to hdfs from streaming file sink. Where should I
>>> provide the IP address of the name node ? Can I provide it as a part of the
>>> flink-config.yaml file or should I provide it like this :
>>>
>>> final StreamingFileSink sink = StreamingFileSink
>>> .forBulkFormat(hdfs://namenode:8020/flink/test, 
>>> ParquetAvroWriters.forGenericRecord(schema))
>>>
>>> .build();
>>>
>>>
>>> Best,
>>> Nick
>>>
>>>
>>>


what is the hash function that Flink creates the UID?

2020-03-02 Thread Felipe Gutierrez
Hi there!

I am tracking the latency of my operators using
"setLatencyTrackingInterval(1)" and I can see the latency metrics on
the browser http://127.0.0.1:8081/jobs//metrics . For each logical
operator I set a .uid("operator_name") and I know that Flink uses the
UidHash to create a string for each operator. For example my operator "A"
has the hash code "2e588ce1c86a9d46e2e85186773ce4fd".

What is the hash function used to define this hash code?

I want to use the same hash function to be able to automatically monitor
the 99th percentile latency. AFAIK, Flink does not provide a way to create
an operator ID that has the operator name included [1][2]. Is there a
specific reason for that?

[1] https://issues.apache.org/jira/browse/FLINK-8592
[2] https://issues.apache.org/jira/browse/FLINK-9653

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-02 Thread Dawid Wysakowicz
Hi Anuj,

What parallelism has your source? Do all of your source tasks produce
records? Watermark is always the minimum of timestamps seen from all the
upstream operators. Therefore if some of them do not produce records the
watermark will not progress. You can read more about Watermarks and how
they work here:
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams

Hope that helps

Best,

Dawid

On 02/03/2020 16:26, aj wrote:
>
> I am trying to use process function to some processing on a set of
> events. I am using event time and keystream. The issue I am facing is
> The watermark value is always coming as 9223372036854725808. I have
> put print statement to debug and it shows like this:
>
> timestamp--1583128014000 extractedTimestamp 1583128014000
> currentwatermark-9223372036854775808
>
> timestamp--1583128048000 extractedTimestamp 1583128048000
> currentwatermark-9223372036854775808
>
> timestamp--1583128089000 extractedTimestamp 1583128089000
> currentwatermark-9223372036854775808
>
> timestamp and extracted timestamp changing but watermark not getting
> updated. So no record is getting in the queue as context.timestamp is
> never less than the watermark.
>
>
> |DataStream dataStream =
> env.addSource(searchConsumer).name("search_list_keyless");
> DataStream dataStreamWithWaterMark =
> dataStream.assignTimestampsAndWatermarks(new SessionAssigner()); try {
> dataStreamWithWaterMark.keyBy((KeySelector)
> record -> { StringBuilder builder = new StringBuilder();
> builder.append(record.get("session_id"));
> builder.append(record.get("user_id")); return builder.toString();
> }).process(new MatchFunction()).print(); } catch (Exception e){
> e.printStackTrace(); } env.execute("start session process"); } public
> static class SessionAssigner implements
> AssignerWithPunctuatedWatermarks { @Override public
> long extractTimestamp(GenericRecord record, long
> previousElementTimestamp) { long timestamp = (long)
> record.get("event_ts"); System.out.println("timestamp--"+
> timestamp); return timestamp; } @Override public Watermark
> checkAndGetNextWatermark(GenericRecord record, long
> extractedTimestamp) { // simply emit a watermark with every event
> System.out.println("extractedTimestamp "+extractedTimestamp); return
> new Watermark(extractedTimestamp - 3); } }|
> |
> ||
> |@Override public void processElement(GenericRecord record, Context
> context, Collector collector) throws Exception { TimerService
> timerService = context.timerService();
> System.out.println("currentwatermark"+
> timerService.currentWatermark()); if (context.timestamp() >
> timerService.currentWatermark()) { Tuple2 PriorityQueue> queueval = queueState.value();
> PriorityQueue queue = queueval.f1; long startTime =
> queueval.f0; System.out.println("starttime"+ startTime); if (queue
> == null) { queue = new PriorityQueue<>(10, new TimeStampComprator());
> startTime = (long) record.get("event_ts"); } queueState.update(new
> Tuple2<>(startTime, queue));
> timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000); } } }||
> Please help me to underand what i am doing wrong.
> -- 
> Thanks & Regards,
> Anuj Jain
>
>
>


signature.asc
Description: OpenPGP digital signature


Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-02 Thread aj
I am trying to use process function to some processing on a set of events.
I am using event time and keystream. The issue I am facing is The watermark
value is always coming as 9223372036854725808. I have put print statement
to debug and it shows like this:

timestamp--1583128014000 extractedTimestamp 1583128014000
currentwatermark-9223372036854775808

timestamp--1583128048000 extractedTimestamp 1583128048000
currentwatermark-9223372036854775808

timestamp--1583128089000 extractedTimestamp 1583128089000
currentwatermark-9223372036854775808

timestamp and extracted timestamp changing but watermark not getting
updated. So no record is getting in the queue as context.timestamp is never
less than the watermark.


DataStream dataStream =
env.addSource(searchConsumer).name("search_list_keyless");
DataStream dataStreamWithWaterMark =
dataStream.assignTimestampsAndWatermarks(new SessionAssigner());

   try {
dataStreamWithWaterMark.keyBy((KeySelector) record -> {
StringBuilder builder = new StringBuilder();
builder.append(record.get("session_id"));
builder.append(record.get("user_id"));
return builder.toString();
}).process(new MatchFunction()).print();
}
catch (Exception e){
e.printStackTrace();
}
env.execute("start session process");

}

public static class SessionAssigner implements
AssignerWithPunctuatedWatermarks  {
@Override
public long extractTimestamp(GenericRecord record, long
previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
System.out.println("timestamp--"+ timestamp);
return timestamp;
}

@Override
public Watermark checkAndGetNextWatermark(GenericRecord
record, long extractedTimestamp) {
// simply emit a watermark with every event
System.out.println("extractedTimestamp "+extractedTimestamp);
return new Watermark(extractedTimestamp - 3);
}
 }

   @Override
public void processElement(GenericRecord record, Context context,
Collector collector) throws Exception {

TimerService timerService = context.timerService();
System.out.println("currentwatermark"+
timerService.currentWatermark());
if (context.timestamp() > timerService.currentWatermark()) {

Tuple2> queueval =
queueState.value();
PriorityQueue queue = queueval.f1;
long startTime = queueval.f0;
System.out.println("starttime"+ startTime);

if (queue == null) {
queue = new PriorityQueue<>(10, new TimeStampComprator());
startTime = (long) record.get("event_ts");
}
queueState.update(new Tuple2<>(startTime, queue));
timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
}
}

}

Please help me to underand what i am doing wrong.


-- 
Thanks & Regards,
Anuj Jain






Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-03-02 Thread Kaymak, Tobias
Thank you! One last question regarding Gordons response. When a pipeline
stops consuming and cleanly shuts down and there is no error during that
process, and then it gets started again and uses the last committed offset
in Kafka - there should be no data loss - or am I missing something?

In what scenario should I expect a data loss? (I can only think of the
jobmanager or taskmanager getting killed before the shutdown is done.)

Best,
Tobi

On Mon, Mar 2, 2020 at 1:45 PM Piotr Nowojski  wrote:

> Hi,
>
> Sorry for my previous slightly confusing response, please take a look at
> the response from Gordon.
>
> Piotrek
>
> On 2 Mar 2020, at 12:05, Kaymak, Tobias  wrote:
>
> Hi,
>
> let me refine my question: My pipeline is generated from Beam, so the
> Flink pipeline is a translated Beam pipeline. When I update my Apache Beam
> pipeline code, working with a snapshot in Flink to stop the pipeline is not
> an option, as the snapshot will use the old representation of the the Flink
> pipeline when resuming from that snapshot.
>
> Meaning that I am looking for a way to drain the pipeline cleanly and
> using the last committed offset in Kafka to resume processing after I
> started it again (launching it through Beam will regenerate the Flink
> pipeline and it should resume at the offset where it left of, that is the
> latest committed offset in Kafka).
>
> Can this be achieved with a cancel or stop of the Flink pipeline?
>
> Best,
> Tobias
>
> On Mon, Mar 2, 2020 at 11:09 AM Piotr Nowojski 
> wrote:
>
>> Hi Tobi,
>>
>> No, FlinkKafkaConsumer is not using committed Kafka’s offsets for
>> recovery. Offsets where to start from are stored in the checkpoint itself.
>> Updating the offsets back to Kafka is an optional, purely cosmetic thing
>> from the Flink’s perspective, so the job will start from the correct
>> offsets.
>>
>> However, if you for whatever the reason re-start the job from a
>> savepoint/checkpoint that’s not the latest one, this will violate
>> exactly-once guarantees - there will be some duplicated records committed
>> two times in the sinks, as simply some records would be processed and
>> committed twice. Committing happens on checkpoint, so if you are recovering
>> to some previous checkpoint, there is nothing Flink can do - some records
>> were already committed before.
>>
>> Piotrek
>>
>> On 2 Mar 2020, at 10:12, Kaymak, Tobias  wrote:
>>
>> Thank you Piotr!
>>
>> One last question - let's assume my source is a Kafka topic - if I stop
>> via the CLI with a savepoint in Flink 1.9, but do not use that savepoint
>> when restarting my job - the job would continue from the last offset that
>> has been committed in Kafka and thus I would also not experience a loss of
>> data in my sink. Is that correct?
>>
>> Best,
>> Tobi
>>
>> On Fri, Feb 28, 2020 at 3:17 PM Piotr Nowojski 
>> wrote:
>>
>>> Yes, that’s correct. There shouldn’t be any data loss. Stop with
>>> savepoint is a solution to make sure, that if you are stopping a job
>>> (either permanently or temporarily) that all of the results are
>>> published/committed to external systems before you actually stop the job.
>>>
>>> If you just cancel/kill/crash a job, in some rare cases (if a checkpoint
>>> was completing at the time cluster was crashing), some records might not be
>>> committed before the cancellation/kill/crash happened. Also note that
>>> doesn’t mean there is a data loss, just those records will be published
>>> once you restore your job from a checkpoint. If you want to stop the job
>>> permanently, that might not happen, hence we need stop with savepoint.
>>>
>>> Piotrek
>>>
>>> On 28 Feb 2020, at 15:02, Kaymak, Tobias 
>>> wrote:
>>>
>>> Thank you! For understanding the matter: When I have a streaming
>>> pipeline (reading from Kafka, writing somewhere) and I click "cancel" and
>>> after that I restart the pipeline - I should not expect any data to be lost
>>> - is that correct?
>>>
>>> Best,
>>> Tobias
>>>
>>> On Fri, Feb 28, 2020 at 2:51 PM Piotr Nowojski 
>>> wrote:
>>>
 Thanks for confirming that Yadong. I’ve created a ticket for that [1].

 Piotrek

 [1] https://issues.apache.org/jira/browse/FLINK-16340

 On 28 Feb 2020, at 14:32, Yadong Xie  wrote:

 Hi

 1. the old stop button was removed in flink 1.9.0 since it could not
 work properly as I know
 2. if we have the feature of the stop with savepoint, we could add it
 to the web UI, but it may still need some work on the rest API to support
 the new feature


 Best,
 Yadong


 Piotr Nowojski  于2020年2月28日周五 下午8:49写道:

> Hi,
>
> I’m not sure. Maybe Yadong (CC) will know more, but to the best of my
> knowledge and research:
>
> 1. In Flink 1.9 we switched from the old webUI to a new one, that
> probably explains the difference you are seeing.
> 2. The “Stop” button in the old webUI, was not working properly - that
> was not stop with savepoint, as stop 

Re: StreamingFileSink Not Flushing All Data

2020-03-02 Thread Austin Cawley-Edwards
Hi Dawid and Kostas,

Sorry for the late reply + thank you for the troubleshooting. I put
together an example repo that reproduces the issue[1], because I did have
checkpointing enabled in my previous case -- still must be doing something
wrong with that config though.

Thanks!
Austin

[1]: https://github.com/austince/flink-streaming-file-sink-compression


On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas  wrote:

> Hi Austin,
>
> Dawid is correct in that you need to enable checkpointing for the
> StreamingFileSink to work.
>
> I hope this solves the problem,
> Kostas
>
> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
>  wrote:
> >
> > Hi Austing,
> >
> > If I am not mistaken the StreamingFileSink by default flushes on
> checkpoints. If you don't have checkpoints enabled it might happen that not
> all data is flushed.
> >
> > I think you can also adjust that behavior with:
> >
> > forBulkFormat(...)
> >
> > .withRollingPolicy(/* your custom logic */)
> >
> > I also cc Kostas who should be able to correct me if I am wrong.
> >
> > Best,
> >
> > Dawid
> >
> > On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
> >
> > Hi there,
> >
> > Using Flink 1.9.1, trying to write .tgz files with the
> StreamingFileSink#BulkWriter. It seems like flushing the output stream
> doesn't flush all the data written. I've verified I can create valid files
> using the same APIs and data on there own, so thinking it must be something
> I'm doing wrong with the bulk format. I'm writing to the local filesystem,
> with the `file://` protocol.
> >
> > For Tar/ Gzipping, I'm using the Apache Commons Compression library,
> version 1.20.
> >
> > Here's a runnable example of the issue:
> >
> > import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
> > import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
> > import
> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> > import org.apache.flink.api.common.serialization.BulkWriter;
> > import org.apache.flink.core.fs.FSDataOutputStream;
> > import org.apache.flink.core.fs.Path;
> > import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
> >
> > import java.io.FileOutputStream;
> > import java.io.IOException;
> > import java.io.Serializable;
> > import java.nio.charset.StandardCharsets;
> >
> > class Scratch {
> >   public static class Record implements Serializable {
> > private static final long serialVersionUID = 1L;
> >
> > String id;
> >
> > public Record() {}
> >
> > public Record(String id) {
> >   this.id = id;
> > }
> >
> > public String getId() {
> >   return id;
> > }
> >
> > public void setId(String id) {
> >   this.id = id;
> > }
> >   }
> >
> >   public static void main(String[] args) throws Exception {
> > final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >
> > TarArchiveOutputStream taos = new TarArchiveOutputStream(new
> GzipCompressorOutputStream(new
> FileOutputStream("/home/austin/Downloads/test.tgz")));
> > TarArchiveEntry fileEntry = new
> TarArchiveEntry(String.format("%s.txt", "test"));
> > String fullText = "hey\nyou\nwork";
> > byte[] fullTextData = fullText.getBytes();
> > fileEntry.setSize(fullTextData.length);
> > taos.putArchiveEntry(fileEntry);
> > taos.write(fullTextData, 0, fullTextData.length);
> > taos.closeArchiveEntry();
> > taos.flush();
> > taos.close();
> >
> > StreamingFileSink textSink = StreamingFileSink
> > .forBulkFormat(new
> Path("file:///home/austin/Downloads/text-output"),
> > new BulkWriter.Factory() {
> >   @Override
> >   public BulkWriter create(FSDataOutputStream out)
> throws IOException {
> > final TarArchiveOutputStream compressedOutputStream =
> new TarArchiveOutputStream(new GzipCompressorOutputStream(out));
> >
> > return new BulkWriter() {
> >   @Override
> >   public void addElement(Record record) throws
> IOException {
> > TarArchiveEntry fileEntry = new
> TarArchiveEntry(String.format("%s.txt", record.id));
> > byte[] fullTextData =
> "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
> > fileEntry.setSize(fullTextData.length);
> > compressedOutputStream.putArchiveEntry(fileEntry);
> > compressedOutputStream.write(fullTextData, 0,
> fullTextData.length);
> > compressedOutputStream.closeArchiveEntry();
> >   }
> >
> >   @Override
> >   public void flush() throws IOException {
> > compressedOutputStream.flush();
> >   }
> >
> >   @Override
> >   public void finish() throws 

Re: SHOW CREATE TABLE in Flink SQL

2020-03-02 Thread Gyula Fóra
Thanks for the positive feedback and creating the JIRA ticket :)

Gyula

On Mon, Mar 2, 2020 at 3:15 PM Jark Wu  wrote:

> big +1 for this. I created an issue for "SHOW CREATE TABLE" [1]. Many
> database systems also support this.
> We can also introduce "describe extended table" in the future but is
> an orthogonal requirement.
>
> Best,
> Jark
>
>
> [1]: https://issues.apache.org/jira/browse/FLINK-16384
>
> On Mon, 2 Mar 2020 at 22:09, Jeff Zhang  wrote:
>
>> +1 for this, maybe we can add 'describe extended table' like hive
>>
>> Gyula Fóra  于2020年3月2日周一 下午8:49写道:
>>
>>> Hi All!
>>>
>>> I am looking for the functionality to show how a table was created or
>>> show all the properties (connector, etc.)
>>>
>>> I could only find DESCRIBE at this point which only shows the schema.
>>>
>>> Is there anything similar to "SHOW CREATE TABLE" or is this something
>>> that we should maybe add in the future?
>>>
>>> Thank you!
>>> Gyula
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: SHOW CREATE TABLE in Flink SQL

2020-03-02 Thread Jark Wu
big +1 for this. I created an issue for "SHOW CREATE TABLE" [1]. Many
database systems also support this.
We can also introduce "describe extended table" in the future but is
an orthogonal requirement.

Best,
Jark


[1]: https://issues.apache.org/jira/browse/FLINK-16384

On Mon, 2 Mar 2020 at 22:09, Jeff Zhang  wrote:

> +1 for this, maybe we can add 'describe extended table' like hive
>
> Gyula Fóra  于2020年3月2日周一 下午8:49写道:
>
>> Hi All!
>>
>> I am looking for the functionality to show how a table was created or
>> show all the properties (connector, etc.)
>>
>> I could only find DESCRIBE at this point which only shows the schema.
>>
>> Is there anything similar to "SHOW CREATE TABLE" or is this something
>> that we should maybe add in the future?
>>
>> Thank you!
>> Gyula
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


java.time.LocalDateTime in POJO type

2020-03-02 Thread OskarM
Hi all,

I wanted to use LocalDateTime field in my POJO class used in Flink's
pipeline. However when I run the job I can see in the logs following
statements:

/class java.time.LocalDateTime does not contain a getter for field date
class java.time.LocalDateTime does not contain a setter for field date
Class class java.time.LocalDateTime cannot be used as a POJO type because
not all fields are valid POJO fields, and must be processed as GenericType.
Please read the Flink documentation on "Data Types & Serialization" for
details of the effect on performance./

I don't see any mention about my POJO type. Does it mean that my class is
still being handled by Flink's internal serializer instead of Kryo / other
fallback mechanism? 
Should I be concerned with those logs mentioned above?

Flink's version I use is: Apache Flink 1.10.0 for Scala 2.12, and I can see
TypeInformation's dedicated to java.time api in the
org.apache.flink.api.common.typeinfo.Types in the library itself.

Best regards,
Oskar



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: SHOW CREATE TABLE in Flink SQL

2020-03-02 Thread Jeff Zhang
+1 for this, maybe we can add 'describe extended table' like hive

Gyula Fóra  于2020年3月2日周一 下午8:49写道:

> Hi All!
>
> I am looking for the functionality to show how a table was created or show
> all the properties (connector, etc.)
>
> I could only find DESCRIBE at this point which only shows the schema.
>
> Is there anything similar to "SHOW CREATE TABLE" or is this something that
> we should maybe add in the future?
>
> Thank you!
> Gyula
>


-- 
Best Regards

Jeff Zhang


Re: [Kubernetes] java.lang.OutOfMemoryError: Metaspace ?

2020-03-02 Thread Arvid Heise
Hi Niels,

to add to Yang. 96m is plenty of space and was heavily tested by Alibaba.

The most likely reason and the motivation for the change is that you
probably have a classloader leak in your pipeline, quite possibly by one of
our connectors. For example, see FLINK-16142 [1].
If you could give us more details about your pipeline, we can try to ping
it down.

However, if the error already occurs during the first start on the cluster,
then the limit is too narrow and increasing it will already help. In that
case, please leave us a quick comment with your used dependencies, so that
we could potentially increase the default setting for future releases.

[1] https://issues.apache.org/jira/browse/FLINK-16142

On Mon, Mar 2, 2020 at 1:27 PM Yang Wang  wrote:

> From 1.10, Flink will enable the metaspace limit via
> "-XX:MaxMetaspaceSize"
> by default. The default value is 96m, loading too many classes will cause
> "OutOfMemoryError: Metaspace"[1]. So you need to increase the configured
> value.
>
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_trouble.html#outofmemoryerror-metaspace
>
>
> Best,
> Yang
>
> Niels Basjes  于2020年3月2日周一 下午7:16写道:
>
>> Hi,
>>
>> I'm running a lot of batch jobs on Kubernetes once in a while I get this
>> exception.
>> What is causing this?
>> How can I fix this?
>>
>> Niels Basjes
>>
>> java.lang.OutOfMemoryError: Metaspace
>> at java.lang.ClassLoader.defineClass1(Native Method)
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader
>> .java:142)
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
>> at org.apache.flink.util.ChildFirstClassLoader.loadClass(
>> ChildFirstClassLoader.java:60)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>> at java.lang.ClassLoader.defineClass1(Native Method)
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader
>> .java:142)
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
>> at org.apache.flink.util.ChildFirstClassLoader.loadClass(
>> ChildFirstClassLoader.java:60)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>> at org.apache.logging.log4j.LogManager.(LogManager.java:60)
>> at org.elasticsearch.common.logging.ESLoggerFactory.getLogger(
>> ESLoggerFactory.java:45)
>> at org.elasticsearch.common.logging.ESLoggerFactory.getLogger(
>> ESLoggerFactory.java:53)
>> at org.elasticsearch.common.logging.Loggers.getLogger(Loggers.java:
>> 104)
>> at org.elasticsearch.common.unit.ByteSizeValue.(ByteSizeValue
>> .java:39)
>> at org.elasticsearch.action.bulk.BulkProcessor$Builder.(
>> BulkProcessor.java:88)
>> at org.elasticsearch.action.bulk.BulkProcessor$Builder.(
>> BulkProcessor.java:80)
>> at org.elasticsearch.action.bulk.BulkProcessor.builder(BulkProcessor
>> .java:174)
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>


Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Nitish Pant
Hi Arvid,

It’s actually the second case. I just wanted to build a scalable generic case 
where I can pass a set of kafka topics and my consumer can use the same 
AvroDeserializationSchema. But yeah, I think I’ll do the fetching latest schema 
part in main() separately.

Thanks for the help!

> On 02-Mar-2020, at 3:50 PM, Arvid Heise  wrote:
> 
> I didn't get the use case completely. Are you using several sensors with 
> different schemas? Are processing them jointly?
> 
> Let's assume some cases:
> 1) Only one format, it would be best to generate a case class with 
> avrohugger. That is especially true if you processing actually requires 
> specific fields to be present.
> 2) Several sensors, but processed independently. You could do the same as 1) 
> for all sensors. If you don't need to access specific fields, you should 
> fetch the latest schema in your main() and all the things that Flink provides.
> 3) You have constantly changing schemas and want to forward records always 
> with the latest schema enriched with some fields. You need to stick to 
> GenericRecord. I'd go with the byte[] approach of my first response if you 
> only have one such application / processing step.
> 4) Else go with the custom TypeInfo/Serializer. We can help you to implement 
> it. If you can do it yourself, I'd be awesome to put it as a response here 
> for other users.
> 
> On Mon, Mar 2, 2020 at 11:01 AM Nitish Pant  > wrote:
> Hi,
> 
> So I am building a data pipeline that takes input from sensors via MQTT 
> broker and passes it to kafka. Before it goes to kafka, I am filtering and 
> serializing the filtered data into avro format and keeping the schema in the 
> registry. Now I want to get that data in flink to process it using some 
> algorithms. So, at the flinkKafkaConsumer end, I currently don’t have the 
> schemas for my data. One work around for me would be to get the schema 
> corresponding the data that I’ll be getting from a topic separately from the 
> registry and then work forward, but I was hoping there would a way to avoid 
> this and integrate the schema registry with my consumer in some way like 
> kafka-connect does. This is why I was trying this solution.
> 
> Do you think I should maybe do the work around method as implementing a 
> GenericRecord would be more of a overhead in the longer run?
> 
> Thanks!
> 
> 
>> On 02-Mar-2020, at 3:11 PM, Arvid Heise > > wrote:
>> 
>> Could you please give more background on your use case? It's hard to give 
>> any advice with the little information you gave us.
>> 
>> Usually, the consumer should know the schema or else it's hard to do 
>> meaningful processing.
>> If it's something completely generic, then there is no way around it, but 
>> that should be the last straw. Here my recommendations from my first 
>> response would come into play.
>> 
>> If they are not working for you for some reason, please let me know why and 
>> I could come up with a solution.
>> 
>> On Mon, Mar 2, 2020 at 10:27 AM Nitish Pant > > wrote:
>> Hi,
>> 
>> Thanks for the replies. I get that it is not wise to use GenericRecord and 
>> that is what is causing the Kryo fallback, but then if not this, how should 
>> I go about writing a AvroSchemaRegistrySchema for when I don’t know the 
>> schema. Without the knowledge of schema, I can’t create a class. Can you 
>> suggest a way of getting around that?
>> 
>> Thanks!
>> 
>>> On 02-Mar-2020, at 2:14 PM, Dawid Wysakowicz >> > wrote:
>>> 
>>> Hi Nitish,
>>> 
>>> Just to slightly extend on Arvid's reply. As Arvid said the Kryo serializer 
>>> comes from the call to TypeExtractor.getForClass(classOf[GenericRecord]). 
>>> As a GenericRecord is not a pojo this call will produce a GenericTypeInfo 
>>> which uses Kryo serialization.
>>> 
>>> For a reference example I would recommend having a look at 
>>> AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for 
>>> working with GenericRecords. One important note. GenericRecords are not the 
>>> best candidates for a data objects in Flink. The reason is if you apply any 
>>> transformation on a GenericRecord e.g. map/flatMap. The input type 
>>> information cannot be forwarded as the transformation is a black box from 
>>> Flink's perspective. Therefore you would need to provide the type 
>>> information for every step of the pipeline:
>>> 
>>> TypeInformation info = ...
>>> 
>>> sEnv.addSource(...) // produces info
>>> 
>>> .map(...)
>>> 
>>> .returns(info) // must be provided again, as the map transformation is a 
>>> black box, the transformation might produce a completely different record
>>> 
>>> Hope that helps a bit.
>>> 
>>> Best,
>>> 
>>> Dawid
>>> 
>>> On 02/03/2020 09:04, Arvid Heise wrote:
 Hi Nitish,
 
 Kryo is the fallback serializer of Flink when everything else fails. In 
 general, performance suffers quite a bit and it's not 

Re: How JobManager and TaskManager find each other?

2020-03-02 Thread Yang Wang
Hi KristoffSC,

Regarding your questions inline.

> 1. task deployment descriptor
The `TaskDeploymentDescriptor` is used by JobMaster to submit a task to
TaskManager.
Since the JobMaster knows all the operator and its location, it will put
the upstream operator location
in the `TaskDeploymentDescriptor`. So when the task is running, it always
know how to communicate
with others.

> 2. Kubernetes job cluster
When you deploy on Kubernetes, it is very different as NAT in PAAS. The
Kubernetes always has a
default overlay network. Each JobManager/TaskManager (i.e. Kubernetes Pod)
will be assigned with
a unique hostname and ip[1]. They could talk to each other directly. So you
do not need to set any
bind-host and bind-port.

> 3. Modify jobmanager.rpc.address
You need to create a Kubernetes service and set the
`jobmanager.rpc.address` to the service name.
This is used for the JobManager fault tolerance. When the JobManager failed
and relaunched again,
the TaskManager could still use the service name to re-register to
JobManager.
You do need to update conf/slaves and just follow the guide[2].


[1].
https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/
[2].
https://github.com/apache/flink/tree/release-1.10/flink-container/kubernetes


Best,
Yang

KristoffSC  于2020年3月2日周一 下午3:25写道:

> Thanks about clarification for NAT,
>
> Moving NAT issue aside for a moment",
>
> Is the process of sending "task deployment descriptor" that you mentioned
> in
> "Feb 26, 2020; 4:18pm" a specially the process of notifying TaskManager
> about IP of participating TaskManagers in job described somewhere? I'm
> familiar with [1] [2] but in there there is no information about sending
> the
> IP information of Task managers.
>
>
> Another question is how this all sums for Kubernetes Job Session Cluster
> deployment when nodes will be deployed across many physical machines inside
> Kubernetes cluster.
> If I'm using Kubernetes like described in [3]
>
> The final question would be, do I have to modify jobmanager.rpc.address and
> flink/conf/slaves file when running Docker JobCluster on Kubernetes. The
> default values are localhost.
> Or just following [3] will be fine?
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
> [3]
>
> https://github.com/apache/flink/tree/release-1.10/flink-container/kubernetes
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 Thread Benchao Li
I just run it in my IDE.

sunfulin  于2020年3月2日周一 下午9:04写道:

>
>
> Hi,
> Yep, I am using 1.10
> Did you submit the job in the cluster or just run it in your IDE? Because
> I can also run it successfully in my IDE, but cannot run it through cluster
> by a shading jar. So I think maybe the problem is related with maven jar
> classpath. But not sure about that.
>
> If you can submit the job by a shade jar through cluster , could you share
> the project pom settings and sample test code ?
>
>
>
>
> At 2020-03-02 20:36:06, "Benchao Li"  wrote:
> >Hi fulin,
> >
> >I cannot reproduce your exception on current master using your SQLs. I
> >searched the error message, it seems that this issue[1] is similar with
> >yours, but it seems that current compile util does not have this issue.
> >
> >BTW, do you using 1.10?
> >
> >[1] https://issues.apache.org/jira/browse/FLINK-7490
> >
> >sunfulin  于2020年3月2日周一 上午11:17写道:
> >
> >>
> >>
> >>
> >> *create table **lscsp_sc_order_all *(
> >>   amount *varchar  *,
> >>   argType *varchar*,
> >>   balance *varchar*,
> >>   branchNo *varchar  *,
> >>   businessType *varchar *,
> >>   channelType *varchar *,
> >>   counterOrderNo *varchar  *,
> >>   counterRegisteredDate *varchar*,
> >>   custAsset *varchar  *,
> >>   customerNumber *varchar*,
> >>   customerType *varchar*,
> >>   discountId *varchar*,
> >>   doubleRecordFlag *varchar*,
> >>   doubleRecordType *varchar*,
> >>   exceedFlag *varchar*,
> >>   fundAccount *varchar*,
> >>   fundCode *varchar*,
> >>   fundCompany *varchar*,
> >>   fundName *varchar*,
> >>   fundRecruitmentFlag *varchar*,
> >>   id *varchar*,
> >>   lastUpdateTime *varchar*,
> >>   opBranchNo *varchar*,
> >>   opStation *varchar*,
> >>   orderNo *varchar*,
> >>   orgEntrustNo *varchar*,
> >>   orgOrderNo *varchar*,
> >>   prodId *varchar*,
> >>   prodInvestorType *varchar*,
> >>   prodLeafType *varchar*,
> >>   prodRisk *varchar*,
> >>   prodRiskFlag *varchar*,
> >>   prodRootType *varchar*,
> >>   prodTerm *varchar*,
> >>   prodVariety *varchar*,
> >>   quaInvestorFlag *varchar*,
> >>   quaInvestorSource *varchar*,
> >>   quickPurchaseFlag *varchar*,
> >>   remark *varchar*,
> >>   remark1 *varchar*,
> >>   remark2 *varchar*,
> >>   remark3 *varchar*,
> >>   riskFlag *varchar*,
> >>   scRcvTime *varchar*,
> >>   scSendTime *varchar*,
> >>   signId *varchar*,
> >>   signSpecialRiskFlag *varchar*,
> >>   source *varchar*,
> >>   *status** varchar*,
> >>   subRiskFlag *varchar*,
> >>   sysNodeId *varchar*,
> >>   taSerialNo *varchar*,
> >>   termFlag *varchar*,
> >>   token *varchar*,
> >>   tradeConfirmDate *varchar*,
> >>   transFundCode *varchar*,
> >>   transProdId *varchar*,
> >>   varietyFlag *varchar*,
> >>   zlcftProdType *varchar*,
> >>   proctime *as *PROCTIME()
> >> *-- 通过计算列产生一个处理时间列*)
> >>
> >> *with*(
> >>   *'connector.type' *= *'kafka'*,
> >> *-- 使用 kafka connector  **'connector.version' *= *'0.10'*,
> >> *-- kafka 版本,universal 支持 0.11 以上的版本  **'connector.topic' *= *''*,
> >>
> >> *-- kafka topic  **'connector.startup-mode' *= *'group-offsets'*,
> >> *-- 从起始 offset 开始读取  **'connector.properties.zookeeper.connect' *=
> >> *''*,
> >> *-- zookeeper 地址  **'connector.properties.bootstrap.servers' *=
> >> *''*,
> >> *-- kafka broker 地址  **'connector.properties.group.id
> >> ' *=
> >> *'acrm-realtime-saleorder-consumer-1'*,
> >>   *'format.type' *= *'json'  *
> >> *-- 数据源格式为 json*)
> >>
> >>
> >> *CREATE TABLE **dim_app_cust_info *(
> >> cust_id *varchar *,
> >> open_comp_name *varchar *,
> >> open_comp_id *varchar *,
> >> org_name *varchar *,
> >> org_id *varchar*,
> >> comp_name *varchar *,
> >> comp_id *varchar *,
> >> mng_name *varchar *,
> >> mng_id *varchar *,
> >> is_tg *varchar *,
> >> cust_name *varchar *,
> >> cust_type *varchar*,
> >> avg_tot_aset_y365 *double *,
> >> avg_aset_create_y
> >> *double*) *WITH *(
> >> *'connector.type' *= *'jdbc'*,
> >> *'connector.url' *= *''*,
> >> *'connector.table' *= *'app_cust_serv_rel_info'*,
> >> *'connector.driver' *= *'com.mysql.jdbc.Driver'*,
> >> *'connector.username' *= *'admin'*,
> >> *'connector.password' *= *'Windows7'*,
> >> *'connector.lookup.cache.max-rows' *= *'8000'*,
> >> *'connector.lookup.cache.ttl' *= *'30min'*,
> >> *'connector.lookup.max-retries' *=
> >> *'3'*)
> >>
> >>
> >>
> >> At 2020-03-02 09:16:05, "Benchao Li"  wrote:
> >> >Could you also provide us the DDL for lscsp_sc_order_all
> >> >and dim_app_cust_info ?
> >> >
> >> >sunfulin  于2020年3月1日周日 下午9:22写道:
> >> >
> >> >>
> >> >> *CREATE TABLE **realtime_product_sell *(
> >> >>   sor_pty_id *varchar*,
> >> >>   entrust_date *varchar*,
> >> >>   entrust_time *varchar*,
> >> >>   product_code *varchar *,
> >> >>   business_type *varchar *,
> >> >>   balance *double *,
> >> >>   cust_name *varchar *,
> >> >>   open_comp_name *varchar *,
> >> >>   open_comp_id *varchar *,
> >> >>   org_name *varchar *,
> >> >>   org_id *varchar *,
> >> >>  

Re: Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 Thread Benchao Li
I just run it in my IDE.

sunfulin  于2020年3月2日周一 下午9:04写道:

>
>
> Hi,
> Yep, I am using 1.10
> Did you submit the job in the cluster or just run it in your IDE? Because
> I can also run it successfully in my IDE, but cannot run it through cluster
> by a shading jar. So I think maybe the problem is related with maven jar
> classpath. But not sure about that.
>
> If you can submit the job by a shade jar through cluster , could you share
> the project pom settings and sample test code ?
>
>
>
>
> At 2020-03-02 20:36:06, "Benchao Li"  wrote:
> >Hi fulin,
> >
> >I cannot reproduce your exception on current master using your SQLs. I
> >searched the error message, it seems that this issue[1] is similar with
> >yours, but it seems that current compile util does not have this issue.
> >
> >BTW, do you using 1.10?
> >
> >[1] https://issues.apache.org/jira/browse/FLINK-7490
> >
> >sunfulin  于2020年3月2日周一 上午11:17写道:
> >
> >>
> >>
> >>
> >> *create table **lscsp_sc_order_all *(
> >>   amount *varchar  *,
> >>   argType *varchar*,
> >>   balance *varchar*,
> >>   branchNo *varchar  *,
> >>   businessType *varchar *,
> >>   channelType *varchar *,
> >>   counterOrderNo *varchar  *,
> >>   counterRegisteredDate *varchar*,
> >>   custAsset *varchar  *,
> >>   customerNumber *varchar*,
> >>   customerType *varchar*,
> >>   discountId *varchar*,
> >>   doubleRecordFlag *varchar*,
> >>   doubleRecordType *varchar*,
> >>   exceedFlag *varchar*,
> >>   fundAccount *varchar*,
> >>   fundCode *varchar*,
> >>   fundCompany *varchar*,
> >>   fundName *varchar*,
> >>   fundRecruitmentFlag *varchar*,
> >>   id *varchar*,
> >>   lastUpdateTime *varchar*,
> >>   opBranchNo *varchar*,
> >>   opStation *varchar*,
> >>   orderNo *varchar*,
> >>   orgEntrustNo *varchar*,
> >>   orgOrderNo *varchar*,
> >>   prodId *varchar*,
> >>   prodInvestorType *varchar*,
> >>   prodLeafType *varchar*,
> >>   prodRisk *varchar*,
> >>   prodRiskFlag *varchar*,
> >>   prodRootType *varchar*,
> >>   prodTerm *varchar*,
> >>   prodVariety *varchar*,
> >>   quaInvestorFlag *varchar*,
> >>   quaInvestorSource *varchar*,
> >>   quickPurchaseFlag *varchar*,
> >>   remark *varchar*,
> >>   remark1 *varchar*,
> >>   remark2 *varchar*,
> >>   remark3 *varchar*,
> >>   riskFlag *varchar*,
> >>   scRcvTime *varchar*,
> >>   scSendTime *varchar*,
> >>   signId *varchar*,
> >>   signSpecialRiskFlag *varchar*,
> >>   source *varchar*,
> >>   *status** varchar*,
> >>   subRiskFlag *varchar*,
> >>   sysNodeId *varchar*,
> >>   taSerialNo *varchar*,
> >>   termFlag *varchar*,
> >>   token *varchar*,
> >>   tradeConfirmDate *varchar*,
> >>   transFundCode *varchar*,
> >>   transProdId *varchar*,
> >>   varietyFlag *varchar*,
> >>   zlcftProdType *varchar*,
> >>   proctime *as *PROCTIME()
> >> *-- 通过计算列产生一个处理时间列*)
> >>
> >> *with*(
> >>   *'connector.type' *= *'kafka'*,
> >> *-- 使用 kafka connector  **'connector.version' *= *'0.10'*,
> >> *-- kafka 版本,universal 支持 0.11 以上的版本  **'connector.topic' *= *''*,
> >>
> >> *-- kafka topic  **'connector.startup-mode' *= *'group-offsets'*,
> >> *-- 从起始 offset 开始读取  **'connector.properties.zookeeper.connect' *=
> >> *''*,
> >> *-- zookeeper 地址  **'connector.properties.bootstrap.servers' *=
> >> *''*,
> >> *-- kafka broker 地址  **'connector.properties.group.id
> >> ' *=
> >> *'acrm-realtime-saleorder-consumer-1'*,
> >>   *'format.type' *= *'json'  *
> >> *-- 数据源格式为 json*)
> >>
> >>
> >> *CREATE TABLE **dim_app_cust_info *(
> >> cust_id *varchar *,
> >> open_comp_name *varchar *,
> >> open_comp_id *varchar *,
> >> org_name *varchar *,
> >> org_id *varchar*,
> >> comp_name *varchar *,
> >> comp_id *varchar *,
> >> mng_name *varchar *,
> >> mng_id *varchar *,
> >> is_tg *varchar *,
> >> cust_name *varchar *,
> >> cust_type *varchar*,
> >> avg_tot_aset_y365 *double *,
> >> avg_aset_create_y
> >> *double*) *WITH *(
> >> *'connector.type' *= *'jdbc'*,
> >> *'connector.url' *= *''*,
> >> *'connector.table' *= *'app_cust_serv_rel_info'*,
> >> *'connector.driver' *= *'com.mysql.jdbc.Driver'*,
> >> *'connector.username' *= *'admin'*,
> >> *'connector.password' *= *'Windows7'*,
> >> *'connector.lookup.cache.max-rows' *= *'8000'*,
> >> *'connector.lookup.cache.ttl' *= *'30min'*,
> >> *'connector.lookup.max-retries' *=
> >> *'3'*)
> >>
> >>
> >>
> >> At 2020-03-02 09:16:05, "Benchao Li"  wrote:
> >> >Could you also provide us the DDL for lscsp_sc_order_all
> >> >and dim_app_cust_info ?
> >> >
> >> >sunfulin  于2020年3月1日周日 下午9:22写道:
> >> >
> >> >>
> >> >> *CREATE TABLE **realtime_product_sell *(
> >> >>   sor_pty_id *varchar*,
> >> >>   entrust_date *varchar*,
> >> >>   entrust_time *varchar*,
> >> >>   product_code *varchar *,
> >> >>   business_type *varchar *,
> >> >>   balance *double *,
> >> >>   cust_name *varchar *,
> >> >>   open_comp_name *varchar *,
> >> >>   open_comp_id *varchar *,
> >> >>   org_name *varchar *,
> >> >>   org_id *varchar *,
> >> >>  

Re:Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 Thread sunfulin




Hi, 
Yep, I am using 1.10
Did you submit the job in the cluster or just run it in your IDE? Because I can 
also run it successfully in my IDE, but cannot run it through cluster by a 
shading jar. So I think maybe the problem is related with maven jar classpath. 
But not sure about that. 


If you can submit the job by a shade jar through cluster , could you share the 
project pom settings and sample test code ?







At 2020-03-02 20:36:06, "Benchao Li"  wrote:
>Hi fulin,
>
>I cannot reproduce your exception on current master using your SQLs. I
>searched the error message, it seems that this issue[1] is similar with
>yours, but it seems that current compile util does not have this issue.
>
>BTW, do you using 1.10?
>
>[1] https://issues.apache.org/jira/browse/FLINK-7490
>
>sunfulin  于2020年3月2日周一 上午11:17写道:
>
>>
>>
>>
>> *create table **lscsp_sc_order_all *(
>>   amount *varchar  *,
>>   argType *varchar*,
>>   balance *varchar*,
>>   branchNo *varchar  *,
>>   businessType *varchar *,
>>   channelType *varchar *,
>>   counterOrderNo *varchar  *,
>>   counterRegisteredDate *varchar*,
>>   custAsset *varchar  *,
>>   customerNumber *varchar*,
>>   customerType *varchar*,
>>   discountId *varchar*,
>>   doubleRecordFlag *varchar*,
>>   doubleRecordType *varchar*,
>>   exceedFlag *varchar*,
>>   fundAccount *varchar*,
>>   fundCode *varchar*,
>>   fundCompany *varchar*,
>>   fundName *varchar*,
>>   fundRecruitmentFlag *varchar*,
>>   id *varchar*,
>>   lastUpdateTime *varchar*,
>>   opBranchNo *varchar*,
>>   opStation *varchar*,
>>   orderNo *varchar*,
>>   orgEntrustNo *varchar*,
>>   orgOrderNo *varchar*,
>>   prodId *varchar*,
>>   prodInvestorType *varchar*,
>>   prodLeafType *varchar*,
>>   prodRisk *varchar*,
>>   prodRiskFlag *varchar*,
>>   prodRootType *varchar*,
>>   prodTerm *varchar*,
>>   prodVariety *varchar*,
>>   quaInvestorFlag *varchar*,
>>   quaInvestorSource *varchar*,
>>   quickPurchaseFlag *varchar*,
>>   remark *varchar*,
>>   remark1 *varchar*,
>>   remark2 *varchar*,
>>   remark3 *varchar*,
>>   riskFlag *varchar*,
>>   scRcvTime *varchar*,
>>   scSendTime *varchar*,
>>   signId *varchar*,
>>   signSpecialRiskFlag *varchar*,
>>   source *varchar*,
>>   *status** varchar*,
>>   subRiskFlag *varchar*,
>>   sysNodeId *varchar*,
>>   taSerialNo *varchar*,
>>   termFlag *varchar*,
>>   token *varchar*,
>>   tradeConfirmDate *varchar*,
>>   transFundCode *varchar*,
>>   transProdId *varchar*,
>>   varietyFlag *varchar*,
>>   zlcftProdType *varchar*,
>>   proctime *as *PROCTIME()
>> *-- 通过计算列产生一个处理时间列*)
>>
>> *with*(
>>   *'connector.type' *= *'kafka'*,
>> *-- 使用 kafka connector  **'connector.version' *= *'0.10'*,
>> *-- kafka 版本,universal 支持 0.11 以上的版本  **'connector.topic' *= *''*,
>>
>> *-- kafka topic  **'connector.startup-mode' *= *'group-offsets'*,
>> *-- 从起始 offset 开始读取  **'connector.properties.zookeeper.connect' *=
>> *''*,
>> *-- zookeeper 地址  **'connector.properties.bootstrap.servers' *=
>> *''*,
>> *-- kafka broker 地址  **'connector.properties.group.id
>> ' *=
>> *'acrm-realtime-saleorder-consumer-1'*,
>>   *'format.type' *= *'json'  *
>> *-- 数据源格式为 json*)
>>
>>
>> *CREATE TABLE **dim_app_cust_info *(
>> cust_id *varchar *,
>> open_comp_name *varchar *,
>> open_comp_id *varchar *,
>> org_name *varchar *,
>> org_id *varchar*,
>> comp_name *varchar *,
>> comp_id *varchar *,
>> mng_name *varchar *,
>> mng_id *varchar *,
>> is_tg *varchar *,
>> cust_name *varchar *,
>> cust_type *varchar*,
>> avg_tot_aset_y365 *double *,
>> avg_aset_create_y
>> *double*) *WITH *(
>> *'connector.type' *= *'jdbc'*,
>> *'connector.url' *= *''*,
>> *'connector.table' *= *'app_cust_serv_rel_info'*,
>> *'connector.driver' *= *'com.mysql.jdbc.Driver'*,
>> *'connector.username' *= *'admin'*,
>> *'connector.password' *= *'Windows7'*,
>> *'connector.lookup.cache.max-rows' *= *'8000'*,
>> *'connector.lookup.cache.ttl' *= *'30min'*,
>> *'connector.lookup.max-retries' *=
>> *'3'*)
>>
>>
>>
>> At 2020-03-02 09:16:05, "Benchao Li"  wrote:
>> >Could you also provide us the DDL for lscsp_sc_order_all
>> >and dim_app_cust_info ?
>> >
>> >sunfulin  于2020年3月1日周日 下午9:22写道:
>> >
>> >>
>> >> *CREATE TABLE **realtime_product_sell *(
>> >>   sor_pty_id *varchar*,
>> >>   entrust_date *varchar*,
>> >>   entrust_time *varchar*,
>> >>   product_code *varchar *,
>> >>   business_type *varchar *,
>> >>   balance *double *,
>> >>   cust_name *varchar *,
>> >>   open_comp_name *varchar *,
>> >>   open_comp_id *varchar *,
>> >>   org_name *varchar *,
>> >>   org_id *varchar *,
>> >>   comp_name *varchar *,
>> >>   comp_id *varchar *,
>> >>   mng_name *varchar *,
>> >>   mng_id *varchar *,
>> >>   is_tg *varchar *,
>> >>   cust_type *varchar *,
>> >>   avg_tot_aset_y365 *double *,
>> >>   avg_aset_create_y
>> >> *double*) *WITH *(
>> >> *'connector.type' *= *'elasticsearch'*,
>> >> *'connector.version' *= *''*,
>> >> *'connector.hosts' *= 

Re:Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 Thread sunfulin




Hi, 
Yep, I am using 1.10
Did you submit the job in the cluster or just run it in your IDE? Because I can 
also run it successfully in my IDE, but cannot run it through cluster by a 
shading jar. So I think maybe the problem is related with maven jar classpath. 
But not sure about that. 


If you can submit the job by a shade jar through cluster , could you share the 
project pom settings and sample test code ?







At 2020-03-02 20:36:06, "Benchao Li"  wrote:
>Hi fulin,
>
>I cannot reproduce your exception on current master using your SQLs. I
>searched the error message, it seems that this issue[1] is similar with
>yours, but it seems that current compile util does not have this issue.
>
>BTW, do you using 1.10?
>
>[1] https://issues.apache.org/jira/browse/FLINK-7490
>
>sunfulin  于2020年3月2日周一 上午11:17写道:
>
>>
>>
>>
>> *create table **lscsp_sc_order_all *(
>>   amount *varchar  *,
>>   argType *varchar*,
>>   balance *varchar*,
>>   branchNo *varchar  *,
>>   businessType *varchar *,
>>   channelType *varchar *,
>>   counterOrderNo *varchar  *,
>>   counterRegisteredDate *varchar*,
>>   custAsset *varchar  *,
>>   customerNumber *varchar*,
>>   customerType *varchar*,
>>   discountId *varchar*,
>>   doubleRecordFlag *varchar*,
>>   doubleRecordType *varchar*,
>>   exceedFlag *varchar*,
>>   fundAccount *varchar*,
>>   fundCode *varchar*,
>>   fundCompany *varchar*,
>>   fundName *varchar*,
>>   fundRecruitmentFlag *varchar*,
>>   id *varchar*,
>>   lastUpdateTime *varchar*,
>>   opBranchNo *varchar*,
>>   opStation *varchar*,
>>   orderNo *varchar*,
>>   orgEntrustNo *varchar*,
>>   orgOrderNo *varchar*,
>>   prodId *varchar*,
>>   prodInvestorType *varchar*,
>>   prodLeafType *varchar*,
>>   prodRisk *varchar*,
>>   prodRiskFlag *varchar*,
>>   prodRootType *varchar*,
>>   prodTerm *varchar*,
>>   prodVariety *varchar*,
>>   quaInvestorFlag *varchar*,
>>   quaInvestorSource *varchar*,
>>   quickPurchaseFlag *varchar*,
>>   remark *varchar*,
>>   remark1 *varchar*,
>>   remark2 *varchar*,
>>   remark3 *varchar*,
>>   riskFlag *varchar*,
>>   scRcvTime *varchar*,
>>   scSendTime *varchar*,
>>   signId *varchar*,
>>   signSpecialRiskFlag *varchar*,
>>   source *varchar*,
>>   *status** varchar*,
>>   subRiskFlag *varchar*,
>>   sysNodeId *varchar*,
>>   taSerialNo *varchar*,
>>   termFlag *varchar*,
>>   token *varchar*,
>>   tradeConfirmDate *varchar*,
>>   transFundCode *varchar*,
>>   transProdId *varchar*,
>>   varietyFlag *varchar*,
>>   zlcftProdType *varchar*,
>>   proctime *as *PROCTIME()
>> *-- 通过计算列产生一个处理时间列*)
>>
>> *with*(
>>   *'connector.type' *= *'kafka'*,
>> *-- 使用 kafka connector  **'connector.version' *= *'0.10'*,
>> *-- kafka 版本,universal 支持 0.11 以上的版本  **'connector.topic' *= *''*,
>>
>> *-- kafka topic  **'connector.startup-mode' *= *'group-offsets'*,
>> *-- 从起始 offset 开始读取  **'connector.properties.zookeeper.connect' *=
>> *''*,
>> *-- zookeeper 地址  **'connector.properties.bootstrap.servers' *=
>> *''*,
>> *-- kafka broker 地址  **'connector.properties.group.id
>> ' *=
>> *'acrm-realtime-saleorder-consumer-1'*,
>>   *'format.type' *= *'json'  *
>> *-- 数据源格式为 json*)
>>
>>
>> *CREATE TABLE **dim_app_cust_info *(
>> cust_id *varchar *,
>> open_comp_name *varchar *,
>> open_comp_id *varchar *,
>> org_name *varchar *,
>> org_id *varchar*,
>> comp_name *varchar *,
>> comp_id *varchar *,
>> mng_name *varchar *,
>> mng_id *varchar *,
>> is_tg *varchar *,
>> cust_name *varchar *,
>> cust_type *varchar*,
>> avg_tot_aset_y365 *double *,
>> avg_aset_create_y
>> *double*) *WITH *(
>> *'connector.type' *= *'jdbc'*,
>> *'connector.url' *= *''*,
>> *'connector.table' *= *'app_cust_serv_rel_info'*,
>> *'connector.driver' *= *'com.mysql.jdbc.Driver'*,
>> *'connector.username' *= *'admin'*,
>> *'connector.password' *= *'Windows7'*,
>> *'connector.lookup.cache.max-rows' *= *'8000'*,
>> *'connector.lookup.cache.ttl' *= *'30min'*,
>> *'connector.lookup.max-retries' *=
>> *'3'*)
>>
>>
>>
>> At 2020-03-02 09:16:05, "Benchao Li"  wrote:
>> >Could you also provide us the DDL for lscsp_sc_order_all
>> >and dim_app_cust_info ?
>> >
>> >sunfulin  于2020年3月1日周日 下午9:22写道:
>> >
>> >>
>> >> *CREATE TABLE **realtime_product_sell *(
>> >>   sor_pty_id *varchar*,
>> >>   entrust_date *varchar*,
>> >>   entrust_time *varchar*,
>> >>   product_code *varchar *,
>> >>   business_type *varchar *,
>> >>   balance *double *,
>> >>   cust_name *varchar *,
>> >>   open_comp_name *varchar *,
>> >>   open_comp_id *varchar *,
>> >>   org_name *varchar *,
>> >>   org_id *varchar *,
>> >>   comp_name *varchar *,
>> >>   comp_id *varchar *,
>> >>   mng_name *varchar *,
>> >>   mng_id *varchar *,
>> >>   is_tg *varchar *,
>> >>   cust_type *varchar *,
>> >>   avg_tot_aset_y365 *double *,
>> >>   avg_aset_create_y
>> >> *double*) *WITH *(
>> >> *'connector.type' *= *'elasticsearch'*,
>> >> *'connector.version' *= *''*,
>> >> *'connector.hosts' *= 

Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-03-02 Thread Piotr Nowojski
Hi,

Sorry for my previous slightly confusing response, please take a look at the 
response from Gordon.

Piotrek

> On 2 Mar 2020, at 12:05, Kaymak, Tobias  wrote:
> 
> Hi,
> 
> let me refine my question: My pipeline is generated from Beam, so the Flink 
> pipeline is a translated Beam pipeline. When I update my Apache Beam pipeline 
> code, working with a snapshot in Flink to stop the pipeline is not an option, 
> as the snapshot will use the old representation of the the Flink pipeline 
> when resuming from that snapshot.
> 
> Meaning that I am looking for a way to drain the pipeline cleanly and using 
> the last committed offset in Kafka to resume processing after I started it 
> again (launching it through Beam will regenerate the Flink pipeline and it 
> should resume at the offset where it left of, that is the latest committed 
> offset in Kafka).
> 
> Can this be achieved with a cancel or stop of the Flink pipeline?
> 
> Best,
> Tobias
> 
> On Mon, Mar 2, 2020 at 11:09 AM Piotr Nowojski  > wrote:
> Hi Tobi,
> 
> No, FlinkKafkaConsumer is not using committed Kafka’s offsets for recovery. 
> Offsets where to start from are stored in the checkpoint itself. Updating the 
> offsets back to Kafka is an optional, purely cosmetic thing from the Flink’s 
> perspective, so the job will start from the correct offsets.
> 
> However, if you for whatever the reason re-start the job from a 
> savepoint/checkpoint that’s not the latest one, this will violate 
> exactly-once guarantees - there will be some duplicated records committed two 
> times in the sinks, as simply some records would be processed and committed 
> twice. Committing happens on checkpoint, so if you are recovering to some 
> previous checkpoint, there is nothing Flink can do - some records were 
> already committed before.
> 
> Piotrek
> 
>> On 2 Mar 2020, at 10:12, Kaymak, Tobias > > wrote:
>> 
>> Thank you Piotr!
>> 
>> One last question - let's assume my source is a Kafka topic - if I stop via 
>> the CLI with a savepoint in Flink 1.9, but do not use that savepoint when 
>> restarting my job - the job would continue from the last offset that has 
>> been committed in Kafka and thus I would also not experience a loss of data 
>> in my sink. Is that correct?
>> 
>> Best,
>> Tobi
>> 
>> On Fri, Feb 28, 2020 at 3:17 PM Piotr Nowojski > > wrote:
>> Yes, that’s correct. There shouldn’t be any data loss. Stop with savepoint 
>> is a solution to make sure, that if you are stopping a job (either 
>> permanently or temporarily) that all of the results are published/committed 
>> to external systems before you actually stop the job. 
>> 
>> If you just cancel/kill/crash a job, in some rare cases (if a checkpoint was 
>> completing at the time cluster was crashing), some records might not be 
>> committed before the cancellation/kill/crash happened. Also note that 
>> doesn’t mean there is a data loss, just those records will be published once 
>> you restore your job from a checkpoint. If you want to stop the job 
>> permanently, that might not happen, hence we need stop with savepoint.
>> 
>> Piotrek
>> 
>>> On 28 Feb 2020, at 15:02, Kaymak, Tobias >> > wrote:
>>> 
>>> Thank you! For understanding the matter: When I have a streaming pipeline 
>>> (reading from Kafka, writing somewhere) and I click "cancel" and after that 
>>> I restart the pipeline - I should not expect any data to be lost - is that 
>>> correct?
>>> 
>>> Best,
>>> Tobias 
>>> 
>>> On Fri, Feb 28, 2020 at 2:51 PM Piotr Nowojski >> > wrote:
>>> Thanks for confirming that Yadong. I’ve created a ticket for that [1].
>>> 
>>> Piotrek
>>> 
>>> [1] https://issues.apache.org/jira/browse/FLINK-16340 
>>> 
>>> 
 On 28 Feb 2020, at 14:32, Yadong Xie >>> > wrote:
 
 Hi
 
 1. the old stop button was removed in flink 1.9.0 since it could not work 
 properly as I know
 2. if we have the feature of the stop with savepoint, we could add it to 
 the web UI, but it may still need some work on the rest API to support the 
 new feature
 
 
 Best,
 Yadong
 
 
 Piotr Nowojski mailto:pi...@ververica.com>> 
 于2020年2月28日周五 下午8:49写道:
 Hi,
 
 I’m not sure. Maybe Yadong (CC) will know more, but to the best of my 
 knowledge and research:
 
 1. In Flink 1.9 we switched from the old webUI to a new one, that probably 
 explains the difference you are seeing.
 2. The “Stop” button in the old webUI, was not working properly - that was 
 not stop with savepoint, as stop with savepoint is a relatively new 
 feature.
 3. Now that we have stop with savepoint (it can be used from CLI as you 
 wrote), probably we could expose this feature in the new UI as well, 
 unless it’s 

Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 Thread Benchao Li
Hi fulin,

I cannot reproduce your exception on current master using your SQLs. I
searched the error message, it seems that this issue[1] is similar with
yours, but it seems that current compile util does not have this issue.

BTW, do you using 1.10?

[1] https://issues.apache.org/jira/browse/FLINK-7490

sunfulin  于2020年3月2日周一 上午11:17写道:

>
>
>
> *create table **lscsp_sc_order_all *(
>   amount *varchar  *,
>   argType *varchar*,
>   balance *varchar*,
>   branchNo *varchar  *,
>   businessType *varchar *,
>   channelType *varchar *,
>   counterOrderNo *varchar  *,
>   counterRegisteredDate *varchar*,
>   custAsset *varchar  *,
>   customerNumber *varchar*,
>   customerType *varchar*,
>   discountId *varchar*,
>   doubleRecordFlag *varchar*,
>   doubleRecordType *varchar*,
>   exceedFlag *varchar*,
>   fundAccount *varchar*,
>   fundCode *varchar*,
>   fundCompany *varchar*,
>   fundName *varchar*,
>   fundRecruitmentFlag *varchar*,
>   id *varchar*,
>   lastUpdateTime *varchar*,
>   opBranchNo *varchar*,
>   opStation *varchar*,
>   orderNo *varchar*,
>   orgEntrustNo *varchar*,
>   orgOrderNo *varchar*,
>   prodId *varchar*,
>   prodInvestorType *varchar*,
>   prodLeafType *varchar*,
>   prodRisk *varchar*,
>   prodRiskFlag *varchar*,
>   prodRootType *varchar*,
>   prodTerm *varchar*,
>   prodVariety *varchar*,
>   quaInvestorFlag *varchar*,
>   quaInvestorSource *varchar*,
>   quickPurchaseFlag *varchar*,
>   remark *varchar*,
>   remark1 *varchar*,
>   remark2 *varchar*,
>   remark3 *varchar*,
>   riskFlag *varchar*,
>   scRcvTime *varchar*,
>   scSendTime *varchar*,
>   signId *varchar*,
>   signSpecialRiskFlag *varchar*,
>   source *varchar*,
>   *status** varchar*,
>   subRiskFlag *varchar*,
>   sysNodeId *varchar*,
>   taSerialNo *varchar*,
>   termFlag *varchar*,
>   token *varchar*,
>   tradeConfirmDate *varchar*,
>   transFundCode *varchar*,
>   transProdId *varchar*,
>   varietyFlag *varchar*,
>   zlcftProdType *varchar*,
>   proctime *as *PROCTIME()
> *-- 通过计算列产生一个处理时间列*)
>
> *with*(
>   *'connector.type' *= *'kafka'*,
> *-- 使用 kafka connector  **'connector.version' *= *'0.10'*,
> *-- kafka 版本,universal 支持 0.11 以上的版本  **'connector.topic' *= *''*,
>
> *-- kafka topic  **'connector.startup-mode' *= *'group-offsets'*,
> *-- 从起始 offset 开始读取  **'connector.properties.zookeeper.connect' *=
> *''*,
> *-- zookeeper 地址  **'connector.properties.bootstrap.servers' *=
> *''*,
> *-- kafka broker 地址  **'connector.properties.group.id
> ' *=
> *'acrm-realtime-saleorder-consumer-1'*,
>   *'format.type' *= *'json'  *
> *-- 数据源格式为 json*)
>
>
> *CREATE TABLE **dim_app_cust_info *(
> cust_id *varchar *,
> open_comp_name *varchar *,
> open_comp_id *varchar *,
> org_name *varchar *,
> org_id *varchar*,
> comp_name *varchar *,
> comp_id *varchar *,
> mng_name *varchar *,
> mng_id *varchar *,
> is_tg *varchar *,
> cust_name *varchar *,
> cust_type *varchar*,
> avg_tot_aset_y365 *double *,
> avg_aset_create_y
> *double*) *WITH *(
> *'connector.type' *= *'jdbc'*,
> *'connector.url' *= *''*,
> *'connector.table' *= *'app_cust_serv_rel_info'*,
> *'connector.driver' *= *'com.mysql.jdbc.Driver'*,
> *'connector.username' *= *'admin'*,
> *'connector.password' *= *'Windows7'*,
> *'connector.lookup.cache.max-rows' *= *'8000'*,
> *'connector.lookup.cache.ttl' *= *'30min'*,
> *'connector.lookup.max-retries' *=
> *'3'*)
>
>
>
> At 2020-03-02 09:16:05, "Benchao Li"  wrote:
> >Could you also provide us the DDL for lscsp_sc_order_all
> >and dim_app_cust_info ?
> >
> >sunfulin  于2020年3月1日周日 下午9:22写道:
> >
> >>
> >> *CREATE TABLE **realtime_product_sell *(
> >>   sor_pty_id *varchar*,
> >>   entrust_date *varchar*,
> >>   entrust_time *varchar*,
> >>   product_code *varchar *,
> >>   business_type *varchar *,
> >>   balance *double *,
> >>   cust_name *varchar *,
> >>   open_comp_name *varchar *,
> >>   open_comp_id *varchar *,
> >>   org_name *varchar *,
> >>   org_id *varchar *,
> >>   comp_name *varchar *,
> >>   comp_id *varchar *,
> >>   mng_name *varchar *,
> >>   mng_id *varchar *,
> >>   is_tg *varchar *,
> >>   cust_type *varchar *,
> >>   avg_tot_aset_y365 *double *,
> >>   avg_aset_create_y
> >> *double*) *WITH *(
> >> *'connector.type' *= *'elasticsearch'*,
> >> *'connector.version' *= *''*,
> >> *'connector.hosts' *= *''*,
> >> *'connector.index' *= *'realtime_product_sell_007118'*,
> >> *'connector.document-type' *= *'_doc'*,
> >> *'update-mode' *= *'upsert'*,
> >> *'connector.key-delimiter' *= *'$'*,
> >> *'connector.key-null-literal' *= *'n/a'*,
> >> *'connector.bulk-flush.interval' *= *'1000'*,
> >> *'format.type' *=
> >> *'json'*)
> >>
> >>
> >>
> >>
> >>
> >> At 2020-03-01 21:08:08, "Benchao Li"  wrote:
> >> >The UDF looks good. Could you also paste your DDL? Then we can produce 
> >> >your
> >> >bug easily.
> >> >
> >> >sunfulin  于2020年3月1日周日 下午6:39写道:
> >> >
> >> >> Below is the code. The function trans origin field 

Re: Flink on AWS - ActiveMQ connector

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi,

The connectors that are listed in the AWS documentation page that you
referenced are not provided by AWS. They are bundled connectors shipped by
the Apache Flink community as part of official Flink releases, and are
discoverable as artifacts from the Maven central repository. See the
respective Flink connector documentation pages (for example [1] for Flink's
Apache Kafka connector) on how to use those connectors in your jobs.

As for the ActiveMQ connector provided by Apache Bahir, there's also a Maven
artifact for that shipped by Apache Bahir [2].

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html
[2] https://bahir.apache.org/docs/flink/current/flink-streaming-activemq/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 Thread Benchao Li
Hi fulin,

I cannot reproduce your exception on current master using your SQLs. I
searched the error message, it seems that this issue[1] is similar with
yours, but it seems that current compile util does not have this issue.

BTW, do you using 1.10?

[1] https://issues.apache.org/jira/browse/FLINK-7490

sunfulin  于2020年3月2日周一 上午11:17写道:

>
>
>
> *create table **lscsp_sc_order_all *(
>   amount *varchar  *,
>   argType *varchar*,
>   balance *varchar*,
>   branchNo *varchar  *,
>   businessType *varchar *,
>   channelType *varchar *,
>   counterOrderNo *varchar  *,
>   counterRegisteredDate *varchar*,
>   custAsset *varchar  *,
>   customerNumber *varchar*,
>   customerType *varchar*,
>   discountId *varchar*,
>   doubleRecordFlag *varchar*,
>   doubleRecordType *varchar*,
>   exceedFlag *varchar*,
>   fundAccount *varchar*,
>   fundCode *varchar*,
>   fundCompany *varchar*,
>   fundName *varchar*,
>   fundRecruitmentFlag *varchar*,
>   id *varchar*,
>   lastUpdateTime *varchar*,
>   opBranchNo *varchar*,
>   opStation *varchar*,
>   orderNo *varchar*,
>   orgEntrustNo *varchar*,
>   orgOrderNo *varchar*,
>   prodId *varchar*,
>   prodInvestorType *varchar*,
>   prodLeafType *varchar*,
>   prodRisk *varchar*,
>   prodRiskFlag *varchar*,
>   prodRootType *varchar*,
>   prodTerm *varchar*,
>   prodVariety *varchar*,
>   quaInvestorFlag *varchar*,
>   quaInvestorSource *varchar*,
>   quickPurchaseFlag *varchar*,
>   remark *varchar*,
>   remark1 *varchar*,
>   remark2 *varchar*,
>   remark3 *varchar*,
>   riskFlag *varchar*,
>   scRcvTime *varchar*,
>   scSendTime *varchar*,
>   signId *varchar*,
>   signSpecialRiskFlag *varchar*,
>   source *varchar*,
>   *status** varchar*,
>   subRiskFlag *varchar*,
>   sysNodeId *varchar*,
>   taSerialNo *varchar*,
>   termFlag *varchar*,
>   token *varchar*,
>   tradeConfirmDate *varchar*,
>   transFundCode *varchar*,
>   transProdId *varchar*,
>   varietyFlag *varchar*,
>   zlcftProdType *varchar*,
>   proctime *as *PROCTIME()
> *-- 通过计算列产生一个处理时间列*)
>
> *with*(
>   *'connector.type' *= *'kafka'*,
> *-- 使用 kafka connector  **'connector.version' *= *'0.10'*,
> *-- kafka 版本,universal 支持 0.11 以上的版本  **'connector.topic' *= *''*,
>
> *-- kafka topic  **'connector.startup-mode' *= *'group-offsets'*,
> *-- 从起始 offset 开始读取  **'connector.properties.zookeeper.connect' *=
> *''*,
> *-- zookeeper 地址  **'connector.properties.bootstrap.servers' *=
> *''*,
> *-- kafka broker 地址  **'connector.properties.group.id
> ' *=
> *'acrm-realtime-saleorder-consumer-1'*,
>   *'format.type' *= *'json'  *
> *-- 数据源格式为 json*)
>
>
> *CREATE TABLE **dim_app_cust_info *(
> cust_id *varchar *,
> open_comp_name *varchar *,
> open_comp_id *varchar *,
> org_name *varchar *,
> org_id *varchar*,
> comp_name *varchar *,
> comp_id *varchar *,
> mng_name *varchar *,
> mng_id *varchar *,
> is_tg *varchar *,
> cust_name *varchar *,
> cust_type *varchar*,
> avg_tot_aset_y365 *double *,
> avg_aset_create_y
> *double*) *WITH *(
> *'connector.type' *= *'jdbc'*,
> *'connector.url' *= *''*,
> *'connector.table' *= *'app_cust_serv_rel_info'*,
> *'connector.driver' *= *'com.mysql.jdbc.Driver'*,
> *'connector.username' *= *'admin'*,
> *'connector.password' *= *'Windows7'*,
> *'connector.lookup.cache.max-rows' *= *'8000'*,
> *'connector.lookup.cache.ttl' *= *'30min'*,
> *'connector.lookup.max-retries' *=
> *'3'*)
>
>
>
> At 2020-03-02 09:16:05, "Benchao Li"  wrote:
> >Could you also provide us the DDL for lscsp_sc_order_all
> >and dim_app_cust_info ?
> >
> >sunfulin  于2020年3月1日周日 下午9:22写道:
> >
> >>
> >> *CREATE TABLE **realtime_product_sell *(
> >>   sor_pty_id *varchar*,
> >>   entrust_date *varchar*,
> >>   entrust_time *varchar*,
> >>   product_code *varchar *,
> >>   business_type *varchar *,
> >>   balance *double *,
> >>   cust_name *varchar *,
> >>   open_comp_name *varchar *,
> >>   open_comp_id *varchar *,
> >>   org_name *varchar *,
> >>   org_id *varchar *,
> >>   comp_name *varchar *,
> >>   comp_id *varchar *,
> >>   mng_name *varchar *,
> >>   mng_id *varchar *,
> >>   is_tg *varchar *,
> >>   cust_type *varchar *,
> >>   avg_tot_aset_y365 *double *,
> >>   avg_aset_create_y
> >> *double*) *WITH *(
> >> *'connector.type' *= *'elasticsearch'*,
> >> *'connector.version' *= *''*,
> >> *'connector.hosts' *= *''*,
> >> *'connector.index' *= *'realtime_product_sell_007118'*,
> >> *'connector.document-type' *= *'_doc'*,
> >> *'update-mode' *= *'upsert'*,
> >> *'connector.key-delimiter' *= *'$'*,
> >> *'connector.key-null-literal' *= *'n/a'*,
> >> *'connector.bulk-flush.interval' *= *'1000'*,
> >> *'format.type' *=
> >> *'json'*)
> >>
> >>
> >>
> >>
> >>
> >> At 2020-03-01 21:08:08, "Benchao Li"  wrote:
> >> >The UDF looks good. Could you also paste your DDL? Then we can produce 
> >> >your
> >> >bug easily.
> >> >
> >> >sunfulin  于2020年3月1日周日 下午6:39写道:
> >> >
> >> >> Below is the code. The function trans origin field 

Re: How is state stored in rocksdb?

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi,

First of all, state is only managed by Flink (and therefore Flink's state
backends) if the state is registered by the user.
You can take a look at the documents here [1] on details on how to register
state.
A state has to be registered for it to be persisted in checkpoints /
savepoints, and be fault-tolerant across Flink job restarts.

As for your second part of your question on serialization:
Once you take a look at how to register state to be managed by Flink, you'd
quickly realize that you can specify the serializer for registered state. If
you simply provide the class of the state data type, then Flink will use its
own type extraction to figure out the serializer to use for the type. Please
see [2] for details on that. Otherwise, a custom serializer implementation
can be provided.
In general, you can find quite a bit about state serialization here [3].

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#using-managed-keyed-state
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [Kubernetes] java.lang.OutOfMemoryError: Metaspace ?

2020-03-02 Thread Yang Wang
>From 1.10, Flink will enable the metaspace limit via "-XX:MaxMetaspaceSize"
by default. The default value is 96m, loading too many classes will cause
"OutOfMemoryError: Metaspace"[1]. So you need to increase the configured
value.


[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_trouble.html#outofmemoryerror-metaspace


Best,
Yang

Niels Basjes  于2020年3月2日周一 下午7:16写道:

> Hi,
>
> I'm running a lot of batch jobs on Kubernetes once in a while I get this
> exception.
> What is causing this?
> How can I fix this?
>
> Niels Basjes
>
> java.lang.OutOfMemoryError: Metaspace
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:
> 142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
> at org.apache.flink.util.ChildFirstClassLoader.loadClass(
> ChildFirstClassLoader.java:60)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:
> 142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
> at org.apache.flink.util.ChildFirstClassLoader.loadClass(
> ChildFirstClassLoader.java:60)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
> at org.apache.logging.log4j.LogManager.(LogManager.java:60)
> at org.elasticsearch.common.logging.ESLoggerFactory.getLogger(
> ESLoggerFactory.java:45)
> at org.elasticsearch.common.logging.ESLoggerFactory.getLogger(
> ESLoggerFactory.java:53)
> at org.elasticsearch.common.logging.Loggers.getLogger(Loggers.java:104
> )
> at org.elasticsearch.common.unit.ByteSizeValue.(ByteSizeValue
> .java:39)
> at org.elasticsearch.action.bulk.BulkProcessor$Builder.(
> BulkProcessor.java:88)
> at org.elasticsearch.action.bulk.BulkProcessor$Builder.(
> BulkProcessor.java:80)
> at org.elasticsearch.action.bulk.BulkProcessor.builder(BulkProcessor
> .java:174)
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: Correct way to e2e test a Flink application?

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi Laurent,

You can take a look at Flink's MiniClusterResource JUnit test rule, and its
usages in the codebase for that.
The rule launches a Flink MiniCluster within the same JVM, and submission to
the mini cluster resembles how it would be submitting to an actual Flink
cluster, so you would already be able to catch problems such as operator
serialization errors.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question about runtime filter

2020-03-02 Thread faaron zheng
I set sql.exec.runtime-filter.wait to true. HiveTableSource take much
longer time but get same result. I think the reason is not commit
preAggregateAccumulator. But I dont know why it happens?

JingsongLee  于 2020年3月2日周一 下午3:22写道:

> Hi,
>
> Does runtime filter probe side wait for building runtime filter?
> Can you check the start time of build side and probe side?
>
> Best,
> Jingsong Lee
>
> --
> From:faaron zheng 
> Send Time:2020年3月2日(星期一) 14:55
> To:user 
> Subject:Question about runtime filter
>
> Hi, everyone
>
> These days, I am trying to implement runtime filter in flink1.10 with
> flink-sql-benchmark  according to blink. I mainly change three part of
> flink code: add runtime filter rule; modify the code gen and bloomfilter;
> add some aggregatedaccumulator  methods according to accumulator. Now, It
> seems runtime filter works in execution graph as follows:
> Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date,
> i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id,
> i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id,
> i_manufact, i_size, i_formulation, i_color, i_units, i_container,
> i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item,
> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12]
> -> Calc(select=[i_item_sk], where=[((i_category =
> _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
> (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])
>
> and
>
> Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq,
> d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year,
> d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday,
> d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly,
> d_same_day_lq, d_current_day, d_current_week, d_current_month,
> d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim,
> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] ->
> Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])
>
>
> However,the number of records sent is the same as normal.  Anyone who can
> give me some advices?
>
>
>
> Thanks
>
>


How is state stored in rocksdb?

2020-03-02 Thread kant kodali
Hi All,

I am wondering how Flink serializes and deserializes state from rockdb?
What is the format used?

For example, say I am doing some stateful streaming and say an object for
my class below represents a state. how does Flink serializes and
deserializes the object of MyClass below? is it just Java or Kryo
serialization? if so, are they queryable?

public class MyClass {
private Map map1 = new HashMap<>();
private Map map2 = new HashMap<>();
}

Thanks!


Correct way to e2e test a Flink application?

2020-03-02 Thread Laurent Exsteens
Hello,

I would like to test a Flink application, including any problem that would
happen when deployed on a distributed cluster.

The way we do this currently is to launch a Flink cluster in Docker and run
the job on it. This setup seems heavy and might not be necessary.

Is there a way to simulate a Flink cluster as if it would be distributed
(including serialisation and static classes behavior on such a cluster)
using, for example, a Flink Mini cluster? If yes, is there any resources
available on how to setup a correct test environment?

Thanks in advance for your help.

Regards,

Laurent.

-- 
♻ Be green, keep it on the screen


Re: Flink Session Window to enrich Event with unique id

2020-03-02 Thread aj
Hi,
Is using the session window to implement the above logic is good idea or i
should use process function.

On Sun, Mar 1, 2020 at 11:39 AM aj  wrote:

> Hi ,
>
> I am working on a use case where i have a stream of events. I want to
> attach a unique id to all the events happened in a session.
> Below is the logis that i am trying to implement. -
>
> 1. session_started
> 2 whenevr a event_name=search generate a unique search_id and attch this
> id to all the following events in session until a new "search" event
> encountered in session.
>
> Example :
> *user-1.  session-1   event_name- search (generate searchid --1)*
> user-1.  session-1   event_name- x  (attach above search id -1)
> user-1.  session-1   event_name- y (attach above search id -1)
> user-1.  session-1   event_name- y (attach above search id -1)
> *user-1.  session-1   event_name- search (generate searchid --2)*
> user-1.  session-1   event_name- x  (attach above search id -2)
> user-1.  session-1   event_name- y (attach above search id -2)
> user-1.  session-1   event_name- y (attach above search id -2)
>
> As events can come out of order so i want to do this after session window
> got over. So after session window i am doing like this :
>
> 1. sort all the events by time.
> 2. iterate ech event and attach the search_id
> 3. collect all th events and generate another stream with enrich search_id.
>
> I am trying with below code but its not working as expected . i am not
> able to understand what is happening.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *dataStream.keyBy((KeySelector) record -> {
> StringBuilder builder = new StringBuilder();
> builder.append(record.get("session_id"));
> builder.append(record.get("user_id"));return
> builder.toString();
> }).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
>   .process(new ProcessWindowFunction String, TimeWindow>() {@Override
> public void process(String key, Context context,
> Iterable iterable, Collector collector)
> throws Exception {Stream result
> = IterableUtils.toStream(iterable);
> List s = result.collect(Collectors.toList());
> Map recordMap = new HashMap<>();
> for(GenericRecord record : s) {
> recordMap.put((long)record.get("event_ts"),record);
> }Map
> sortedRecordMap = new LinkedHashMap<>();
> recordMap.entrySet().stream()
> .sorted(Map.Entry.comparingByKey())
> .forEachOrdered(x -> sortedRecordMap.put(x.getKey(), x.getValue()));
> String search_id = null;
> for(Map.Entry element :sortedRecordMap.entrySet()) {
> GenericRecord record = element.getValue();
>   if(record.get("event_name").equals("search")) {
>   search_id =
> UUID.randomUUID().toString();}
>   record.put("search_id",search_id);
> collector.collect(record);}
> }}).print();*
>
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
> 
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






[Kubernetes] java.lang.OutOfMemoryError: Metaspace ?

2020-03-02 Thread Niels Basjes
Hi,

I'm running a lot of batch jobs on Kubernetes once in a while I get this
exception.
What is causing this?
How can I fix this?

Niels Basjes

java.lang.OutOfMemoryError: Metaspace
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:
142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at org.apache.flink.util.ChildFirstClassLoader.loadClass(
ChildFirstClassLoader.java:60)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:
142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at org.apache.flink.util.ChildFirstClassLoader.loadClass(
ChildFirstClassLoader.java:60)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
at org.apache.logging.log4j.LogManager.(LogManager.java:60)
at org.elasticsearch.common.logging.ESLoggerFactory.getLogger(
ESLoggerFactory.java:45)
at org.elasticsearch.common.logging.ESLoggerFactory.getLogger(
ESLoggerFactory.java:53)
at org.elasticsearch.common.logging.Loggers.getLogger(Loggers.java:104)
at org.elasticsearch.common.unit.ByteSizeValue.(ByteSizeValue
.java:39)
at org.elasticsearch.action.bulk.BulkProcessor$Builder.(
BulkProcessor.java:88)
at org.elasticsearch.action.bulk.BulkProcessor$Builder.(
BulkProcessor.java:80)
at org.elasticsearch.action.bulk.BulkProcessor.builder(BulkProcessor
.java:174)

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Flink on AWS - ActiveMQ connector

2020-03-02 Thread KristoffSC
Hi all,
In AWS documentation [1] we can see that AWS provides some set of connectors
for Flink. I would need to use an ActiveMQ one provided by [2]. Currently
I'm using Docker based stand alone Job Cluster and not AWS one. 

Whats up with those connectors provided by AWS? Will I be able to use my
connector in AWS? I assume that its Jar will be part of Job jar.


[1] https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/#connectors-in-apache-bahir



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-03-02 Thread Kaymak, Tobias
Hi,

let me refine my question: My pipeline is generated from Beam, so the Flink
pipeline is a translated Beam pipeline. When I update my Apache Beam
pipeline code, working with a snapshot in Flink to stop the pipeline is not
an option, as the snapshot will use the old representation of the the Flink
pipeline when resuming from that snapshot.

Meaning that I am looking for a way to drain the pipeline cleanly and using
the last committed offset in Kafka to resume processing after I started it
again (launching it through Beam will regenerate the Flink pipeline and it
should resume at the offset where it left of, that is the latest committed
offset in Kafka).

Can this be achieved with a cancel or stop of the Flink pipeline?

Best,
Tobias

On Mon, Mar 2, 2020 at 11:09 AM Piotr Nowojski  wrote:

> Hi Tobi,
>
> No, FlinkKafkaConsumer is not using committed Kafka’s offsets for
> recovery. Offsets where to start from are stored in the checkpoint itself.
> Updating the offsets back to Kafka is an optional, purely cosmetic thing
> from the Flink’s perspective, so the job will start from the correct
> offsets.
>
> However, if you for whatever the reason re-start the job from a
> savepoint/checkpoint that’s not the latest one, this will violate
> exactly-once guarantees - there will be some duplicated records committed
> two times in the sinks, as simply some records would be processed and
> committed twice. Committing happens on checkpoint, so if you are recovering
> to some previous checkpoint, there is nothing Flink can do - some records
> were already committed before.
>
> Piotrek
>
> On 2 Mar 2020, at 10:12, Kaymak, Tobias  wrote:
>
> Thank you Piotr!
>
> One last question - let's assume my source is a Kafka topic - if I stop
> via the CLI with a savepoint in Flink 1.9, but do not use that savepoint
> when restarting my job - the job would continue from the last offset that
> has been committed in Kafka and thus I would also not experience a loss of
> data in my sink. Is that correct?
>
> Best,
> Tobi
>
> On Fri, Feb 28, 2020 at 3:17 PM Piotr Nowojski 
> wrote:
>
>> Yes, that’s correct. There shouldn’t be any data loss. Stop with
>> savepoint is a solution to make sure, that if you are stopping a job
>> (either permanently or temporarily) that all of the results are
>> published/committed to external systems before you actually stop the job.
>>
>> If you just cancel/kill/crash a job, in some rare cases (if a checkpoint
>> was completing at the time cluster was crashing), some records might not be
>> committed before the cancellation/kill/crash happened. Also note that
>> doesn’t mean there is a data loss, just those records will be published
>> once you restore your job from a checkpoint. If you want to stop the job
>> permanently, that might not happen, hence we need stop with savepoint.
>>
>> Piotrek
>>
>> On 28 Feb 2020, at 15:02, Kaymak, Tobias 
>> wrote:
>>
>> Thank you! For understanding the matter: When I have a streaming pipeline
>> (reading from Kafka, writing somewhere) and I click "cancel" and after that
>> I restart the pipeline - I should not expect any data to be lost - is that
>> correct?
>>
>> Best,
>> Tobias
>>
>> On Fri, Feb 28, 2020 at 2:51 PM Piotr Nowojski 
>> wrote:
>>
>>> Thanks for confirming that Yadong. I’ve created a ticket for that [1].
>>>
>>> Piotrek
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-16340
>>>
>>> On 28 Feb 2020, at 14:32, Yadong Xie  wrote:
>>>
>>> Hi
>>>
>>> 1. the old stop button was removed in flink 1.9.0 since it could not
>>> work properly as I know
>>> 2. if we have the feature of the stop with savepoint, we could add it to
>>> the web UI, but it may still need some work on the rest API to support the
>>> new feature
>>>
>>>
>>> Best,
>>> Yadong
>>>
>>>
>>> Piotr Nowojski  于2020年2月28日周五 下午8:49写道:
>>>
 Hi,

 I’m not sure. Maybe Yadong (CC) will know more, but to the best of my
 knowledge and research:

 1. In Flink 1.9 we switched from the old webUI to a new one, that
 probably explains the difference you are seeing.
 2. The “Stop” button in the old webUI, was not working properly - that
 was not stop with savepoint, as stop with savepoint is a relatively new
 feature.
 3. Now that we have stop with savepoint (it can be used from CLI as you
 wrote), probably we could expose this feature in the new UI as well, unless
 it’s already exposed somewhere? Yadong, do you know an answer for that?

 Piotrek

 On 27 Feb 2020, at 13:31, Kaymak, Tobias 
 wrote:

 Hello,

 before Flink 1.9 I was able to "Stop" a streaming pipeline - after
 clicking that button in the webinterface it performed a clean shutdown. Now
 with Flink 1.9 I just see the option to cancel it.

 However, using the commandline flink stop -d
 266c5b38cf9d8e61a398a0bef4a1b350 still does the trick. So the
 functionality is there.

 Has the button been removed on purpose?

 

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Arvid Heise
I didn't get the use case completely. Are you using several sensors with
different schemas? Are processing them jointly?

Let's assume some cases:
1) Only one format, it would be best to generate a case class with
avrohugger. That is especially true if you processing actually requires
specific fields to be present.
2) Several sensors, but processed independently. You could do the same as
1) for all sensors. If you don't need to access specific fields, you should
fetch the latest schema in your main() and all the things that Flink
provides.
3) You have constantly changing schemas and want to forward records always
with the latest schema enriched with some fields. You need to stick to
GenericRecord. I'd go with the byte[] approach of my first response if you
only have one such application / processing step.
4) Else go with the custom TypeInfo/Serializer. We can help you to
implement it. If you can do it yourself, I'd be awesome to put it as a
response here for other users.

On Mon, Mar 2, 2020 at 11:01 AM Nitish Pant  wrote:

> Hi,
>
> So I am building a data pipeline that takes input from sensors via MQTT
> broker and passes it to kafka. Before it goes to kafka, I am filtering and
> serializing the filtered data into avro format and keeping the schema in
> the registry. Now I want to get that data in flink to process it using some
> algorithms. So, at the flinkKafkaConsumer end, I currently don’t have the
> schemas for my data. One work around for me would be to get the schema
> corresponding the data that I’ll be getting from a topic separately from
> the registry and then work forward, but I was hoping there would a way to
> avoid this and integrate the schema registry with my consumer in some way
> like kafka-connect does. This is why I was trying this solution.
>
> Do you think I should maybe do the work around method as implementing a
> GenericRecord would be more of a overhead in the longer run?
>
> Thanks!
>
>
> On 02-Mar-2020, at 3:11 PM, Arvid Heise  wrote:
>
> Could you please give more background on your use case? It's hard to give
> any advice with the little information you gave us.
>
> Usually, the consumer should know the schema or else it's hard to do
> meaningful processing.
> If it's something completely generic, then there is no way around it, but
> that should be the last straw. Here my recommendations from my first
> response would come into play.
>
> If they are not working for you for some reason, please let me know why
> and I could come up with a solution.
>
> On Mon, Mar 2, 2020 at 10:27 AM Nitish Pant 
> wrote:
>
>> Hi,
>>
>> Thanks for the replies. I get that it is not wise to use GenericRecord
>> and that is what is causing the Kryo fallback, but then if not this, how
>> should I go about writing a AvroSchemaRegistrySchema for when I don’t know
>> the schema. Without the knowledge of schema, I can’t create a class. Can
>> you suggest a way of getting around that?
>>
>> Thanks!
>>
>> On 02-Mar-2020, at 2:14 PM, Dawid Wysakowicz 
>> wrote:
>>
>> Hi Nitish,
>>
>> Just to slightly extend on Arvid's reply. As Arvid said the Kryo
>> serializer comes from the call to 
>> TypeExtractor.getForClass(classOf[GenericRecord]).
>> As a GenericRecord is not a pojo this call will produce a GenericTypeInfo
>> which uses Kryo serialization.
>>
>> For a reference example I would recommend having a look at
>> AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for
>> working with GenericRecords. One important note. GenericRecords are not the
>> best candidates for a data objects in Flink. The reason is if you apply any
>> transformation on a GenericRecord e.g. map/flatMap. The input type
>> information cannot be forwarded as the transformation is a black box from
>> Flink's perspective. Therefore you would need to provide the type
>> information for every step of the pipeline:
>>
>> TypeInformation info = ...
>>
>> sEnv.addSource(...) // produces info
>>
>> .map(...)
>>
>> .returns(info) // must be provided again, as the map transformation is a
>> black box, the transformation might produce a completely different record
>>
>> Hope that helps a bit.
>>
>> Best,
>>
>> Dawid
>> On 02/03/2020 09:04, Arvid Heise wrote:
>>
>> Hi Nitish,
>>
>> Kryo is the fallback serializer of Flink when everything else fails. In
>> general, performance suffers quite a bit and it's not always applicable as
>> in your case. Especially, in production code, it's best to avoid it
>> completely.
>>
>> In your case, the issue is that your provided type information is
>> completely meaningless. getProducedType is not providing any actual type
>> information but just references to a generic skeleton. Flink uses the type
>> information to reason about the value structures, which it cannot in your
>> case.
>>
>> If you really need to resort to a completely generic serializer (which is
>> usually not needed), then you have a few options:
>> * Easiest, stick to byte[] and convert in a downstream UDF. If it's that
>> 

Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-03-02 Thread Piotr Nowojski
Hi Tobi,

No, FlinkKafkaConsumer is not using committed Kafka’s offsets for recovery. 
Offsets where to start from are stored in the checkpoint itself. Updating the 
offsets back to Kafka is an optional, purely cosmetic thing from the Flink’s 
perspective, so the job will start from the correct offsets.

However, if you for whatever the reason re-start the job from a 
savepoint/checkpoint that’s not the latest one, this will violate exactly-once 
guarantees - there will be some duplicated records committed two times in the 
sinks, as simply some records would be processed and committed twice. 
Committing happens on checkpoint, so if you are recovering to some previous 
checkpoint, there is nothing Flink can do - some records were already committed 
before.

Piotrek

> On 2 Mar 2020, at 10:12, Kaymak, Tobias  wrote:
> 
> Thank you Piotr!
> 
> One last question - let's assume my source is a Kafka topic - if I stop via 
> the CLI with a savepoint in Flink 1.9, but do not use that savepoint when 
> restarting my job - the job would continue from the last offset that has been 
> committed in Kafka and thus I would also not experience a loss of data in my 
> sink. Is that correct?
> 
> Best,
> Tobi
> 
> On Fri, Feb 28, 2020 at 3:17 PM Piotr Nowojski  > wrote:
> Yes, that’s correct. There shouldn’t be any data loss. Stop with savepoint is 
> a solution to make sure, that if you are stopping a job (either permanently 
> or temporarily) that all of the results are published/committed to external 
> systems before you actually stop the job. 
> 
> If you just cancel/kill/crash a job, in some rare cases (if a checkpoint was 
> completing at the time cluster was crashing), some records might not be 
> committed before the cancellation/kill/crash happened. Also note that doesn’t 
> mean there is a data loss, just those records will be published once you 
> restore your job from a checkpoint. If you want to stop the job permanently, 
> that might not happen, hence we need stop with savepoint.
> 
> Piotrek
> 
>> On 28 Feb 2020, at 15:02, Kaymak, Tobias > > wrote:
>> 
>> Thank you! For understanding the matter: When I have a streaming pipeline 
>> (reading from Kafka, writing somewhere) and I click "cancel" and after that 
>> I restart the pipeline - I should not expect any data to be lost - is that 
>> correct?
>> 
>> Best,
>> Tobias 
>> 
>> On Fri, Feb 28, 2020 at 2:51 PM Piotr Nowojski > > wrote:
>> Thanks for confirming that Yadong. I’ve created a ticket for that [1].
>> 
>> Piotrek
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-16340 
>> 
>> 
>>> On 28 Feb 2020, at 14:32, Yadong Xie >> > wrote:
>>> 
>>> Hi
>>> 
>>> 1. the old stop button was removed in flink 1.9.0 since it could not work 
>>> properly as I know
>>> 2. if we have the feature of the stop with savepoint, we could add it to 
>>> the web UI, but it may still need some work on the rest API to support the 
>>> new feature
>>> 
>>> 
>>> Best,
>>> Yadong
>>> 
>>> 
>>> Piotr Nowojski mailto:pi...@ververica.com>> 
>>> 于2020年2月28日周五 下午8:49写道:
>>> Hi,
>>> 
>>> I’m not sure. Maybe Yadong (CC) will know more, but to the best of my 
>>> knowledge and research:
>>> 
>>> 1. In Flink 1.9 we switched from the old webUI to a new one, that probably 
>>> explains the difference you are seeing.
>>> 2. The “Stop” button in the old webUI, was not working properly - that was 
>>> not stop with savepoint, as stop with savepoint is a relatively new feature.
>>> 3. Now that we have stop with savepoint (it can be used from CLI as you 
>>> wrote), probably we could expose this feature in the new UI as well, unless 
>>> it’s already exposed somewhere? Yadong, do you know an answer for that?
>>> 
>>> Piotrek
>>> 
 On 27 Feb 2020, at 13:31, Kaymak, Tobias >>> > wrote:
 
 Hello,
 
 before Flink 1.9 I was able to "Stop" a streaming pipeline - after 
 clicking that button in the webinterface it performed a clean shutdown. 
 Now with Flink 1.9 I just see the option to cancel it. 
 
 However, using the commandline flink stop -d 
 266c5b38cf9d8e61a398a0bef4a1b350 still does the trick. So the 
 functionality is there. 
 
 Has the button been removed on purpose?
 
 Best,
 Tobias
>>> 
>> 
>> 
>> 
>> -- 
>> 
>> Tobias Kaymak
>> Data Engineer
>> Data Intelligence
>> 
>> tobias.kay...@ricardo.ch 
>> www.ricardo.ch 
>> Theilerstrasse 1a, 6300 Zug
>> 
> 



Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-03-02 Thread Tzu-Li Tai
Hi Tobi,

In this case, the job would indeed continue from the last offset that has
been committed in Kafka (assuming that you are using the
`startFromGroupOffsets` start position) for the specified group id.
However, do keep in mind that those offsets are not consistent with the
offsets written in Flink savepoints that are used for exactly-once
processing guarantees. Therefore, they do not provide any guarantees for
data loss.

BR,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Nitish Pant
Hi,

So I am building a data pipeline that takes input from sensors via MQTT broker 
and passes it to kafka. Before it goes to kafka, I am filtering and serializing 
the filtered data into avro format and keeping the schema in the registry. Now 
I want to get that data in flink to process it using some algorithms. So, at 
the flinkKafkaConsumer end, I currently don’t have the schemas for my data. One 
work around for me would be to get the schema corresponding the data that I’ll 
be getting from a topic separately from the registry and then work forward, but 
I was hoping there would a way to avoid this and integrate the schema registry 
with my consumer in some way like kafka-connect does. This is why I was trying 
this solution.

Do you think I should maybe do the work around method as implementing a 
GenericRecord would be more of a overhead in the longer run?

Thanks!


> On 02-Mar-2020, at 3:11 PM, Arvid Heise  wrote:
> 
> Could you please give more background on your use case? It's hard to give any 
> advice with the little information you gave us.
> 
> Usually, the consumer should know the schema or else it's hard to do 
> meaningful processing.
> If it's something completely generic, then there is no way around it, but 
> that should be the last straw. Here my recommendations from my first response 
> would come into play.
> 
> If they are not working for you for some reason, please let me know why and I 
> could come up with a solution.
> 
> On Mon, Mar 2, 2020 at 10:27 AM Nitish Pant  > wrote:
> Hi,
> 
> Thanks for the replies. I get that it is not wise to use GenericRecord and 
> that is what is causing the Kryo fallback, but then if not this, how should I 
> go about writing a AvroSchemaRegistrySchema for when I don’t know the schema. 
> Without the knowledge of schema, I can’t create a class. Can you suggest a 
> way of getting around that?
> 
> Thanks!
> 
>> On 02-Mar-2020, at 2:14 PM, Dawid Wysakowicz > > wrote:
>> 
>> Hi Nitish,
>> 
>> Just to slightly extend on Arvid's reply. As Arvid said the Kryo serializer 
>> comes from the call to TypeExtractor.getForClass(classOf[GenericRecord]). As 
>> a GenericRecord is not a pojo this call will produce a GenericTypeInfo which 
>> uses Kryo serialization.
>> 
>> For a reference example I would recommend having a look at 
>> AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for 
>> working with GenericRecords. One important note. GenericRecords are not the 
>> best candidates for a data objects in Flink. The reason is if you apply any 
>> transformation on a GenericRecord e.g. map/flatMap. The input type 
>> information cannot be forwarded as the transformation is a black box from 
>> Flink's perspective. Therefore you would need to provide the type 
>> information for every step of the pipeline:
>> 
>> TypeInformation info = ...
>> 
>> sEnv.addSource(...) // produces info
>> 
>> .map(...)
>> 
>> .returns(info) // must be provided again, as the map transformation is a 
>> black box, the transformation might produce a completely different record
>> 
>> Hope that helps a bit.
>> 
>> Best,
>> 
>> Dawid
>> 
>> On 02/03/2020 09:04, Arvid Heise wrote:
>>> Hi Nitish,
>>> 
>>> Kryo is the fallback serializer of Flink when everything else fails. In 
>>> general, performance suffers quite a bit and it's not always applicable as 
>>> in your case. Especially, in production code, it's best to avoid it 
>>> completely.
>>> 
>>> In your case, the issue is that your provided type information is 
>>> completely meaningless. getProducedType is not providing any actual type 
>>> information but just references to a generic skeleton. Flink uses the type 
>>> information to reason about the value structures, which it cannot in your 
>>> case.
>>> 
>>> If you really need to resort to a completely generic serializer (which is 
>>> usually not needed), then you have a few options:
>>> * Easiest, stick to byte[] and convert in a downstream UDF. If it's that 
>>> generic you probably have only a simple transformation before outputting it 
>>> into some generic Kafka sink. So your UDF deserializes, does some generic 
>>> stuff, and immediately turns it back into byte[].
>>> * Implement your own generic TypeInformation with serializer. 
>>> WritableTypeInfo [1] is a generic example on how to do it. This will 
>>> automatically convert byte[] back and forth to GenericRecord. That would be 
>>> the recommended way when you have multiple transformations before source 
>>> and sink.
>>> 
>>> [1] 
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
>>>  
>>> 
>>> On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant >> 

关于task异常的问题

2020-03-02 Thread lucas.wu
Hi 大家好
最近有使用flink自带的jdbc outputformat 
将flink处理后的数据写到mysql,但是如果我的数据格式有问题,比如超过mysql对应字段设置的大小,或者数据库出现问题,导致延时。这些问题都会导致这个task抛出异常,导致task
 fail,进而导致整个job从checkpoint重启。
我的问题是,如果我使用的是flink提供的outputformat,我是否可以catch 异常,并且忽略。如果没有,有没有其它好的办法?

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Arvid Heise
Could you please give more background on your use case? It's hard to give
any advice with the little information you gave us.

Usually, the consumer should know the schema or else it's hard to do
meaningful processing.
If it's something completely generic, then there is no way around it, but
that should be the last straw. Here my recommendations from my first
response would come into play.

If they are not working for you for some reason, please let me know why and
I could come up with a solution.

On Mon, Mar 2, 2020 at 10:27 AM Nitish Pant  wrote:

> Hi,
>
> Thanks for the replies. I get that it is not wise to use GenericRecord and
> that is what is causing the Kryo fallback, but then if not this, how should
> I go about writing a AvroSchemaRegistrySchema for when I don’t know the
> schema. Without the knowledge of schema, I can’t create a class. Can you
> suggest a way of getting around that?
>
> Thanks!
>
> On 02-Mar-2020, at 2:14 PM, Dawid Wysakowicz 
> wrote:
>
> Hi Nitish,
>
> Just to slightly extend on Arvid's reply. As Arvid said the Kryo
> serializer comes from the call to 
> TypeExtractor.getForClass(classOf[GenericRecord]).
> As a GenericRecord is not a pojo this call will produce a GenericTypeInfo
> which uses Kryo serialization.
>
> For a reference example I would recommend having a look at
> AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for
> working with GenericRecords. One important note. GenericRecords are not the
> best candidates for a data objects in Flink. The reason is if you apply any
> transformation on a GenericRecord e.g. map/flatMap. The input type
> information cannot be forwarded as the transformation is a black box from
> Flink's perspective. Therefore you would need to provide the type
> information for every step of the pipeline:
>
> TypeInformation info = ...
>
> sEnv.addSource(...) // produces info
>
> .map(...)
>
> .returns(info) // must be provided again, as the map transformation is a
> black box, the transformation might produce a completely different record
>
> Hope that helps a bit.
>
> Best,
>
> Dawid
> On 02/03/2020 09:04, Arvid Heise wrote:
>
> Hi Nitish,
>
> Kryo is the fallback serializer of Flink when everything else fails. In
> general, performance suffers quite a bit and it's not always applicable as
> in your case. Especially, in production code, it's best to avoid it
> completely.
>
> In your case, the issue is that your provided type information is
> completely meaningless. getProducedType is not providing any actual type
> information but just references to a generic skeleton. Flink uses the type
> information to reason about the value structures, which it cannot in your
> case.
>
> If you really need to resort to a completely generic serializer (which is
> usually not needed), then you have a few options:
> * Easiest, stick to byte[] and convert in a downstream UDF. If it's that
> generic you probably have only a simple transformation before outputting it
> into some generic Kafka sink. So your UDF deserializes, does some generic
> stuff, and immediately turns it back into byte[].
> * Implement your own generic TypeInformation with serializer.
> WritableTypeInfo [1] is a generic example on how to do it. This will
> automatically convert byte[] back and forth to GenericRecord. That would be
> the recommended way when you have multiple transformations before source
> and sink.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
>
> On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant 
> wrote:
>
>> Hi all,
>>
>> I am trying to work with flink to get avro data from kafka for which the
>> schemas are stored in kafka schema registry. Since, the producer for kafka
>> is a totally different service(an MQTT consumer sinked to kafka), I can’t
>> have the schema with me at the consumer end. I read around and diverged to
>> the following implementation of KeyedDeserializationSchema but I cannot
>> understand why it’s throwing a `*com.esotericsoftware.kryo.KryoException:
>> java.lang.**NullPointerException*`
>> class AvroDeserializationSchema(schemaRegistryUrl: String) extends
>> KeyedDeserializationSchema[GenericRecord] { // Flink needs the serializer
>> to be serializable => this "@transient lazy val" does the trick @transient
>> lazy val valueDeserializer = { val deserializer = new
>> KafkaAvroDeserializer(new CachedSchemaRegistryClient(schemaRegistryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT))
>> deserializer.configure(
>> Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG ->
>> schemaRegistryUrl, KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG
>> -> false).asJava, false) deserializer } override def
>> isEndOfStream(nextElement: GenericRecord): Boolean = false override def
>> deserialize(messageKey: Array[Byte], message: Array[Byte], topic: String,
>> partition: Int, offset: Long): 

????minikube ????flink??????

2020-03-02 Thread msxu
??ubuntu16.04minikubeflink??
./bin/kubernetes-session.sh \
 -Dkubernetes.cluster-id=test \
 -Dtaskmanager.memory.process.size=1024m \
 -Dkubernetes.taskmanager.cpu=1 \
 -Dtaskmanager.numberOfTaskSlots=1 \
 -Dresourcemanager.taskmanager-timeout=360







??pod

??

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Nitish Pant
Hi,

Thanks for the replies. I get that it is not wise to use GenericRecord and that 
is what is causing the Kryo fallback, but then if not this, how should I go 
about writing a AvroSchemaRegistrySchema for when I don’t know the schema. 
Without the knowledge of schema, I can’t create a class. Can you suggest a way 
of getting around that?

Thanks!

> On 02-Mar-2020, at 2:14 PM, Dawid Wysakowicz  wrote:
> 
> Hi Nitish,
> 
> Just to slightly extend on Arvid's reply. As Arvid said the Kryo serializer 
> comes from the call to TypeExtractor.getForClass(classOf[GenericRecord]). As 
> a GenericRecord is not a pojo this call will produce a GenericTypeInfo which 
> uses Kryo serialization.
> 
> For a reference example I would recommend having a look at 
> AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for working 
> with GenericRecords. One important note. GenericRecords are not the best 
> candidates for a data objects in Flink. The reason is if you apply any 
> transformation on a GenericRecord e.g. map/flatMap. The input type 
> information cannot be forwarded as the transformation is a black box from 
> Flink's perspective. Therefore you would need to provide the type information 
> for every step of the pipeline:
> 
> TypeInformation info = ...
> 
> sEnv.addSource(...) // produces info
> 
> .map(...)
> 
> .returns(info) // must be provided again, as the map transformation is a 
> black box, the transformation might produce a completely different record
> 
> Hope that helps a bit.
> 
> Best,
> 
> Dawid
> 
> On 02/03/2020 09:04, Arvid Heise wrote:
>> Hi Nitish,
>> 
>> Kryo is the fallback serializer of Flink when everything else fails. In 
>> general, performance suffers quite a bit and it's not always applicable as 
>> in your case. Especially, in production code, it's best to avoid it 
>> completely.
>> 
>> In your case, the issue is that your provided type information is completely 
>> meaningless. getProducedType is not providing any actual type information 
>> but just references to a generic skeleton. Flink uses the type information 
>> to reason about the value structures, which it cannot in your case.
>> 
>> If you really need to resort to a completely generic serializer (which is 
>> usually not needed), then you have a few options:
>> * Easiest, stick to byte[] and convert in a downstream UDF. If it's that 
>> generic you probably have only a simple transformation before outputting it 
>> into some generic Kafka sink. So your UDF deserializes, does some generic 
>> stuff, and immediately turns it back into byte[].
>> * Implement your own generic TypeInformation with serializer. 
>> WritableTypeInfo [1] is a generic example on how to do it. This will 
>> automatically convert byte[] back and forth to GenericRecord. That would be 
>> the recommended way when you have multiple transformations before source and 
>> sink.
>> 
>> [1] 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
>>  
>> 
>> On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant > > wrote:
>> Hi all,
>> 
>> I am trying to work with flink to get avro data from kafka for which the 
>> schemas are stored in kafka schema registry. Since, the producer for kafka 
>> is a totally different service(an MQTT consumer sinked to kafka), I can’t 
>> have the schema with me at the consumer end. I read around and diverged to 
>> the following implementation of KeyedDeserializationSchema but I cannot 
>> understand why it’s throwing a `com.esotericsoftware.kryo.KryoException: 
>> java.lang.NullPointerException`
>> 
>> class AvroDeserializationSchema(schemaRegistryUrl: String) extends 
>> KeyedDeserializationSchema[GenericRecord] {
>> 
>>   // Flink needs the serializer to be serializable => this "@transient lazy 
>> val" does the trick
>>   @transient lazy val valueDeserializer = {
>> val deserializer = new KafkaAvroDeserializer(new 
>> CachedSchemaRegistryClient(schemaRegistryUrl, 
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT))
>> deserializer.configure(
>>   Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> 
>> schemaRegistryUrl,
>> KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> 
>> false).asJava,
>>   false)
>> deserializer
>>   }
>> 
>>   override def isEndOfStream(nextElement: GenericRecord): Boolean = false
>> 
>>   override def deserialize(messageKey: Array[Byte], message: Array[Byte],
>> topic: String, partition: Int, offset: Long): GenericRecord = {
>> 
>>  // val key = keyDeserializer.deserialize(topic, 
>> messageKey).asInstanceOf[String]
>>   val value = valueDeserializer.deserialize(topic, 
>> 

Re: 开发相关问题咨询Development related problems consultation

2020-03-02 Thread JingsongLee
Hi, welcome,

For user side,

u...@flink.apache.org is for English.
user-zh@flink.apache.org is for Chinese.

d...@flink.apache.org is for development related discussions, so please not 
send to it.

Best,
Jingsong Lee


--
From:王博迪 
Send Time:2020年3月2日(星期一) 17:08
To:user-zh ; dev 
Subject:开发相关问题咨询Development related problems consultation

您好,
   我是你们flink的新用户,有一些开发相关的问题想咨询,问一下可以和哪个邮箱交流。
谢谢
Hello, I am a new user of flink. I would like to ask you some questions related 
to development. I would like to know which email can I communicate with

Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-03-02 Thread Kaymak, Tobias
Thank you Piotr!

One last question - let's assume my source is a Kafka topic - if I stop via
the CLI with a savepoint in Flink 1.9, but do not use that savepoint when
restarting my job - the job would continue from the last offset that has
been committed in Kafka and thus I would also not experience a loss of data
in my sink. Is that correct?

Best,
Tobi

On Fri, Feb 28, 2020 at 3:17 PM Piotr Nowojski  wrote:

> Yes, that’s correct. There shouldn’t be any data loss. Stop with savepoint
> is a solution to make sure, that if you are stopping a job (either
> permanently or temporarily) that all of the results are published/committed
> to external systems before you actually stop the job.
>
> If you just cancel/kill/crash a job, in some rare cases (if a checkpoint
> was completing at the time cluster was crashing), some records might not be
> committed before the cancellation/kill/crash happened. Also note that
> doesn’t mean there is a data loss, just those records will be published
> once you restore your job from a checkpoint. If you want to stop the job
> permanently, that might not happen, hence we need stop with savepoint.
>
> Piotrek
>
> On 28 Feb 2020, at 15:02, Kaymak, Tobias  wrote:
>
> Thank you! For understanding the matter: When I have a streaming pipeline
> (reading from Kafka, writing somewhere) and I click "cancel" and after that
> I restart the pipeline - I should not expect any data to be lost - is that
> correct?
>
> Best,
> Tobias
>
> On Fri, Feb 28, 2020 at 2:51 PM Piotr Nowojski 
> wrote:
>
>> Thanks for confirming that Yadong. I’ve created a ticket for that [1].
>>
>> Piotrek
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-16340
>>
>> On 28 Feb 2020, at 14:32, Yadong Xie  wrote:
>>
>> Hi
>>
>> 1. the old stop button was removed in flink 1.9.0 since it could not
>> work properly as I know
>> 2. if we have the feature of the stop with savepoint, we could add it to
>> the web UI, but it may still need some work on the rest API to support the
>> new feature
>>
>>
>> Best,
>> Yadong
>>
>>
>> Piotr Nowojski  于2020年2月28日周五 下午8:49写道:
>>
>>> Hi,
>>>
>>> I’m not sure. Maybe Yadong (CC) will know more, but to the best of my
>>> knowledge and research:
>>>
>>> 1. In Flink 1.9 we switched from the old webUI to a new one, that
>>> probably explains the difference you are seeing.
>>> 2. The “Stop” button in the old webUI, was not working properly - that
>>> was not stop with savepoint, as stop with savepoint is a relatively new
>>> feature.
>>> 3. Now that we have stop with savepoint (it can be used from CLI as you
>>> wrote), probably we could expose this feature in the new UI as well, unless
>>> it’s already exposed somewhere? Yadong, do you know an answer for that?
>>>
>>> Piotrek
>>>
>>> On 27 Feb 2020, at 13:31, Kaymak, Tobias 
>>> wrote:
>>>
>>> Hello,
>>>
>>> before Flink 1.9 I was able to "Stop" a streaming pipeline - after
>>> clicking that button in the webinterface it performed a clean shutdown. Now
>>> with Flink 1.9 I just see the option to cancel it.
>>>
>>> However, using the commandline flink stop -d
>>> 266c5b38cf9d8e61a398a0bef4a1b350 still does the trick. So the
>>> functionality is there.
>>>
>>> Has the button been removed on purpose?
>>>
>>> Best,
>>> Tobias
>>>
>>>
>>>
>>
>
> --
>
> Tobias Kaymak
> Data Engineer
> Data Intelligence
>
> tobias.kay...@ricardo.ch
> www.ricardo.ch
> Theilerstrasse 1a, 6300 Zug
>
>
>


Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-02 Thread kant kodali
Hi Arvid,

Yes I got it..and it works as said in my previous email.

Thanks!


On Mon, Mar 2, 2020 at 12:10 AM Arvid Heise  wrote:

> Hi Kant,
>
> I think Dawid meant to not add the Kafka version number like this:
>
> flinkShadowJar 
> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"
>
>
> On Sun, Mar 1, 2020 at 7:31 PM kant kodali  wrote:
>
>> * What went wrong:
>> Could not determine the dependencies of task ':shadowJar'.
>> > Could not resolve all dependencies for configuration ':flinkShadowJar'.
>>> Could not find
>> org.apache.flink:flink-sql-connector-kafka_2.11:universal.
>>  Searched in the following locations:
>>-
>> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom
>>-
>> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar
>>-
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom
>>-
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar
>>  Required by:
>>  project :
>>
>>
>>
>> On Sun, Mar 1, 2020 at 6:43 AM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Kant,
>>>
>>> If you want to use the *universal *kafka connector you use "universal"
>>> for the version. The community decided to no longer distinguish different
>>> kafka connector versions, but to use the newest kafka client version for
>>> all versions of kafka 1.0+. So if you want to use the connector from
>>> flink-sql-connector-kafka_2.11 use "universal" for the version.
>>>
>>> As for the collect/print sink. We do realize importance of the sink and
>>> there were a few approaches to implement one. Including the TableUtils
>>> mentioned by godfrey. It does not have strong consistency guarantees and is
>>> recommended rather only for experiments/testing. There is also an ongoing
>>> discussion how to implement such a sink for *both *batch and streaming
>>> here:
>>> https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=17046455#comment-17046455
>>>
>>> Best,
>>>
>>> Dawid
>>> On 01/03/2020 12:00, kant kodali wrote:
>>>
>>> Hi Benchao,
>>>
>>> That worked! Pasting the build.gradle file here. However this only works
>>> for 0.11 and it needs zookeeper.connect() which shouldn't be required. not
>>> sure why it is required in Flink Kafka connector?  If I change the version
>>> to 2.2 in the code and specify this jar
>>>
>>> flinkShadowJar 
>>> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"
>>>
>>> or
>>>
>>> flinkShadowJar 
>>> "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}" 
>>> //Not sure if I should use this one for Kafka >= 0.11
>>>
>>> It doesn't work either.
>>>
>>>
>>> buildscript {repositories {jcenter() // this applies only to 
>>> the Gradle 'Shadow' plugin}dependencies {classpath 
>>> 'com.github.jengelman.gradle.plugins:shadow:2.0.4'}}plugins {id 
>>> 'java'id 'application'}mainClassName = 'Test'apply plugin: 
>>> 'com.github.johnrengelman.shadow'ext {javaVersion = '1.8'
>>> flinkVersion = '1.10.0'scalaBinaryVersion = '2.11'slf4jVersion = 
>>> '1.7.7'log4jVersion = '1.2.17'}sourceCompatibility = 
>>> javaVersiontargetCompatibility = javaVersiontasks.withType(JavaCompile) {   
>>>  options.encoding = 'UTF-8'}applicationDefaultJvmArgs = 
>>> ["-Dlog4j.configuration=log4j.properties"]
>>> // declare where to find the dependencies of your projectrepositories {
>>> mavenCentral()
>>> maven { url 
>>> "https://repository.apache.org/content/repositories/snapshots/; }}// NOTE: 
>>> We cannot use "compileOnly" or "shadow" configurations since then we could 
>>> not run code// in the IDE or with "gradle run". We also cannot exclude 
>>> transitive dependencies from the// shadowJar yet (see 
>>> https://github.com/johnrengelman/shadow/issues/159).// -> Explicitly define 
>>> the // libraries we want to be included in the "flinkShadowJar" 
>>> configuration!configurations {flinkShadowJar // dependencies which go 
>>> into the shadowJar// always exclude these (also from transitive 
>>> dependencies) since they are provided by FlinkflinkShadowJar.exclude 
>>> group: 'org.apache.flink', module: 'force-shading'
>>> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'  
>>>   flinkShadowJar.exclude group: 'org.slf4j'flinkShadowJar.exclude 
>>> group: 'log4j'}// declare the dependencies for your production and test 
>>> codedependencies {// 
>>> --// 
>>> Compile-time dependencies that should NOT be part of the 

Re: [Question] enable end2end Kafka exactly once processing

2020-03-02 Thread Arvid Heise
Hi Eleanore,

the flink runner is maintained by the Beam developers, so it's best to ask
on their user list.

The documentation is, however, very clear. "Flink runner is one of the
runners whose checkpoint semantics are not compatible with current
implementation (hope to provide a solution in near future)."
So, Beam uses a different approach to EOS than Flink and there is currently
no way around it. Maybe, you could use the EOS Kafka Sink of Flink directly
and use that in Beam somehow.

I'm not aware of any work with the Beam devs to actually make it work.
Independently, we started to improve our interfaces for two phase commit
sinks (which is our approach). It might coincidentally help Beam.

Best,

Arvid

On Sun, Mar 1, 2020 at 8:23 PM Jin Yi  wrote:

> Hi experts,
>
> My application is using Apache Beam and with Flink to be the runner. My
> source and sink are kafka topics, and I am using KafkaIO connector provided
> by Apache Beam to consume and publish.
>
> I am reading through Beam's java doc:
> https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-
>
> It looks like Beam does not support Flink Runner for EOS, can someone
> please shad some lights on how to enable exactly once processing with
> Apache Beam?
>
> Thanks a lot!
> Eleanore
>


开发相关问题咨询Development related problems consultation

2020-03-02 Thread 王博迪
您好,
   我是你们flink的新用户,有一些开发相关的问题想咨询,问一下可以和哪个邮箱交流。
谢谢
Hello, I am a new user of flink. I would like to ask you some questions related 
to development. I would like to know which email can I communicate with

Re: Question about runtime filter

2020-03-02 Thread faaron zheng
Thanks for replying Lee,  I follow your method to debug the code and I find
the build side only call addPreAggregatedAccumulator but not call commit
method. Furthermore, I add a breakpoint at future.handleAsync in
asyncGetBroadcastBloomFilter method. But when program stop at if(e==null &&
accumulator != null), it finish with result immediately. Any suggestion?

JingsongLee  于 2020年3月2日周一 下午3:22写道:

> Hi,
>
> Does runtime filter probe side wait for building runtime filter?
> Can you check the start time of build side and probe side?
>
> Best,
> Jingsong Lee
>
> --
> From:faaron zheng 
> Send Time:2020年3月2日(星期一) 14:55
> To:user 
> Subject:Question about runtime filter
>
> Hi, everyone
>
> These days, I am trying to implement runtime filter in flink1.10 with
> flink-sql-benchmark  according to blink. I mainly change three part of
> flink code: add runtime filter rule; modify the code gen and bloomfilter;
> add some aggregatedaccumulator  methods according to accumulator. Now, It
> seems runtime filter works in execution graph as follows:
> Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date,
> i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id,
> i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id,
> i_manufact, i_size, i_formulation, i_color, i_units, i_container,
> i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item,
> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12]
> -> Calc(select=[i_item_sk], where=[((i_category =
> _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
> (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])
>
> and
>
> Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq,
> d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year,
> d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday,
> d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly,
> d_same_day_lq, d_current_day, d_current_week, d_current_month,
> d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim,
> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] ->
> Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])
>
>
> However,the number of records sent is the same as normal.  Anyone who can
> give me some advices?
>
>
>
> Thanks
>
>


Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Dawid Wysakowicz
Hi Nitish,

Just to slightly extend on Arvid's reply. As Arvid said the Kryo
serializer comes from the call to
TypeExtractor.getForClass(classOf[GenericRecord]). As a GenericRecord is
not a pojo this call will produce a GenericTypeInfo which uses Kryo
serialization.

For a reference example I would recommend having a look at
AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for
working with GenericRecords. One important note. GenericRecords are not
the best candidates for a data objects in Flink. The reason is if you
apply any transformation on a GenericRecord e.g. map/flatMap. The input
type information cannot be forwarded as the transformation is a black
box from Flink's perspective. Therefore you would need to provide the
type information for every step of the pipeline:

TypeInformation info = ...

sEnv.addSource(...) // produces info

.map(...)

.returns(info) // must be provided again, as the map transformation is a
black box, the transformation might produce a completely different record

Hope that helps a bit.

Best,

Dawid

On 02/03/2020 09:04, Arvid Heise wrote:
> Hi Nitish,
>
> Kryo is the fallback serializer of Flink when everything else fails.
> In general, performance suffers quite a bit and it's not always
> applicable as in your case. Especially, in production code, it's best
> to avoid it completely.
>
> In your case, the issue is that your provided type information is
> completely meaningless. getProducedType is not providing any actual
> type information but just references to a generic skeleton. Flink uses
> the type information to reason about the value structures, which it
> cannot in your case.
>
> If you really need to resort to a completely generic serializer (which
> is usually not needed), then you have a few options:
> * Easiest, stick to byte[] and convert in a downstream UDF. If it's
> that generic you probably have only a simple transformation before
> outputting it into some generic Kafka sink. So your UDF deserializes,
> does some generic stuff, and immediately turns it back into byte[].
> * Implement your own generic TypeInformation with serializer.
> WritableTypeInfo [1] is a generic example on how to do it. This will
> automatically convert byte[] back and forth to GenericRecord. That
> would be the recommended way when you have multiple transformations
> before source and sink.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
>
> On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant  > wrote:
>
> Hi all,
>
> I am trying to work with flink to get avro data from kafka for
> which the schemas are stored in kafka schema registry. Since, the
> producer for kafka is a totally different service(an MQTT consumer
> sinked to kafka), I can’t have the schema with me at the consumer
> end. I read around and diverged to the following implementation of
> KeyedDeserializationSchema but I cannot understand why it’s
> throwing a `*com.esotericsoftware.kryo.KryoException:
> java.lang.**NullPointerException*`
> class AvroDeserializationSchema(schemaRegistryUrl: String) extends
> KeyedDeserializationSchema[GenericRecord] { // Flink needs the
> serializer to be serializable => this "@transient lazy val" does
> the trick @transient lazy val valueDeserializer = { val
> deserializer = new KafkaAvroDeserializer(new
> CachedSchemaRegistryClient(schemaRegistryUrl,
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT))
> deserializer.configure(
> Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG ->
> schemaRegistryUrl,
> KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG ->
> false).asJava, false) deserializer } override def
> isEndOfStream(nextElement: GenericRecord): Boolean = false
> override def deserialize(messageKey: Array[Byte], message:
> Array[Byte], topic: String, partition: Int, offset: Long):
> GenericRecord = { // val key = keyDeserializer.deserialize(topic,
> messageKey).asInstanceOf[String] val value =
> valueDeserializer.deserialize(topic,
> message).asInstanceOf[GenericRecord] value } override def
> getProducedType: TypeInformation[GenericRecord] =
> TypeExtractor.getForClass(classOf[GenericRecord]) }
> I have no clue how to go about solving this. I saw a lot of people
> trying to implement the same. If someone can guide me, it’d be
> really helpful.
>
> Thanks!
> Nitish
>


signature.asc
Description: OpenPGP digital signature


Re: 使用Flink1.10.0读取hive时source并行度问题

2020-03-02 Thread JingsongLee
建议使用Batch模式来读取Hive table。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 16:35
To:lzljs3620...@aliyun.com 
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题

  
我使用的是 StreamTableEnvironment,确实有碰到这个问题呢。
在2020年3月2日 16:16,JingsongLee 写道:   
> 自动推断可能面临资源不足无法启动的问题

理论上不应该呀?Batch作业是可以部分运行的。

Best,
Jingsong Lee

--
From:like 
Send Time:2020年3月2日(星期一) 15:35
To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com 

Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。 
 

在2020年3月2日 15:18,JingsongLee 写道:   Hi, 

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?

Re: 使用Flink1.10.0读取hive时source并行度问题

2020-03-02 Thread JingsongLee
> 自动推断可能面临资源不足无法启动的问题

理论上不应该呀?Batch作业是可以部分运行的。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 15:35
To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com 

Subject:回复: 使用Flink1.10.0读取hive时source并行度问题

  
非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。 
 

在2020年3月2日 15:18,JingsongLee 写道:Hi, 

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?  

Re: Exceptions in Web UI do not appear in logs

2020-03-02 Thread Arvid Heise
If an exception is unhandled in connectors, it will eventually be handled
by the runtime, where it is logged and the task fails. Doing both logging
and throwing an exception is an anti-pattern as the consumer of an
exception should have the sole responsibility of handling it correctly.

In your case, the question is why the error is not properly logged on task
level. Since you are using a very old version of Flink, chances are high
that this issue is already resolved in a more recent version.

Do you have the option to upgrade?

On Sun, Mar 1, 2020 at 4:56 PM orips  wrote:

> Hi,
>
> It's version 1.5.2.
>
> I actually found the place in the code responsible for it.
> In the "catch" block, it doesn't log the error and it lets it propagate.
>
>
> https://github.com/apache/flink/blob/62839e88e15b338a8af9afcef698c38a194c592f/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-02 Thread Arvid Heise
Hi Kant,

I think Dawid meant to not add the Kafka version number like this:

flinkShadowJar "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"


On Sun, Mar 1, 2020 at 7:31 PM kant kodali  wrote:

> * What went wrong:
> Could not determine the dependencies of task ':shadowJar'.
> > Could not resolve all dependencies for configuration ':flinkShadowJar'.
>> Could not find
> org.apache.flink:flink-sql-connector-kafka_2.11:universal.
>  Searched in the following locations:
>-
> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom
>-
> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar
>-
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom
>-
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar
>  Required by:
>  project :
>
>
>
> On Sun, Mar 1, 2020 at 6:43 AM Dawid Wysakowicz 
> wrote:
>
>> Hi Kant,
>>
>> If you want to use the *universal *kafka connector you use "universal"
>> for the version. The community decided to no longer distinguish different
>> kafka connector versions, but to use the newest kafka client version for
>> all versions of kafka 1.0+. So if you want to use the connector from
>> flink-sql-connector-kafka_2.11 use "universal" for the version.
>>
>> As for the collect/print sink. We do realize importance of the sink and
>> there were a few approaches to implement one. Including the TableUtils
>> mentioned by godfrey. It does not have strong consistency guarantees and is
>> recommended rather only for experiments/testing. There is also an ongoing
>> discussion how to implement such a sink for *both *batch and streaming
>> here:
>> https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=17046455#comment-17046455
>>
>> Best,
>>
>> Dawid
>> On 01/03/2020 12:00, kant kodali wrote:
>>
>> Hi Benchao,
>>
>> That worked! Pasting the build.gradle file here. However this only works
>> for 0.11 and it needs zookeeper.connect() which shouldn't be required. not
>> sure why it is required in Flink Kafka connector?  If I change the version
>> to 2.2 in the code and specify this jar
>>
>> flinkShadowJar 
>> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"
>>
>> or
>>
>> flinkShadowJar 
>> "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}" //Not 
>> sure if I should use this one for Kafka >= 0.11
>>
>> It doesn't work either.
>>
>>
>> buildscript {repositories {jcenter() // this applies only to the 
>> Gradle 'Shadow' plugin}dependencies {classpath 
>> 'com.github.jengelman.gradle.plugins:shadow:2.0.4'}}plugins {id 
>> 'java'id 'application'}mainClassName = 'Test'apply plugin: 
>> 'com.github.johnrengelman.shadow'ext {javaVersion = '1.8'
>> flinkVersion = '1.10.0'scalaBinaryVersion = '2.11'slf4jVersion = 
>> '1.7.7'log4jVersion = '1.2.17'}sourceCompatibility = 
>> javaVersiontargetCompatibility = javaVersiontasks.withType(JavaCompile) {
>> options.encoding = 'UTF-8'}applicationDefaultJvmArgs = 
>> ["-Dlog4j.configuration=log4j.properties"]
>> // declare where to find the dependencies of your projectrepositories {
>> mavenCentral()
>> maven { url 
>> "https://repository.apache.org/content/repositories/snapshots/; }}// NOTE: 
>> We cannot use "compileOnly" or "shadow" configurations since then we could 
>> not run code// in the IDE or with "gradle run". We also cannot exclude 
>> transitive dependencies from the// shadowJar yet (see 
>> https://github.com/johnrengelman/shadow/issues/159).// -> Explicitly define 
>> the // libraries we want to be included in the "flinkShadowJar" 
>> configuration!configurations {flinkShadowJar // dependencies which go 
>> into the shadowJar// always exclude these (also from transitive 
>> dependencies) since they are provided by FlinkflinkShadowJar.exclude 
>> group: 'org.apache.flink', module: 'force-shading'flinkShadowJar.exclude 
>> group: 'com.google.code.findbugs', module: 'jsr305'
>> flinkShadowJar.exclude group: 'org.slf4j'flinkShadowJar.exclude group: 
>> 'log4j'}// declare the dependencies for your production and test 
>> codedependencies {// 
>> --// 
>> Compile-time dependencies that should NOT be part of the// shadow jar 
>> and are provided in the lib folder of Flink// 
>> --compile 
>> "org.apache.flink:flink-java:${flinkVersion}"compile 
>> 

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Arvid Heise
Hi Nitish,

Kryo is the fallback serializer of Flink when everything else fails. In
general, performance suffers quite a bit and it's not always applicable as
in your case. Especially, in production code, it's best to avoid it
completely.

In your case, the issue is that your provided type information is
completely meaningless. getProducedType is not providing any actual type
information but just references to a generic skeleton. Flink uses the type
information to reason about the value structures, which it cannot in your
case.

If you really need to resort to a completely generic serializer (which is
usually not needed), then you have a few options:
* Easiest, stick to byte[] and convert in a downstream UDF. If it's that
generic you probably have only a simple transformation before outputting it
into some generic Kafka sink. So your UDF deserializes, does some generic
stuff, and immediately turns it back into byte[].
* Implement your own generic TypeInformation with serializer.
WritableTypeInfo [1] is a generic example on how to do it. This will
automatically convert byte[] back and forth to GenericRecord. That would be
the recommended way when you have multiple transformations before source
and sink.

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java

On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant  wrote:

> Hi all,
>
> I am trying to work with flink to get avro data from kafka for which the
> schemas are stored in kafka schema registry. Since, the producer for kafka
> is a totally different service(an MQTT consumer sinked to kafka), I can’t
> have the schema with me at the consumer end. I read around and diverged to
> the following implementation of KeyedDeserializationSchema but I cannot
> understand why it’s throwing a `*com.esotericsoftware.kryo.KryoException:
> java.lang.**NullPointerException*`
>
> class AvroDeserializationSchema(schemaRegistryUrl: String) extends
> KeyedDeserializationSchema[GenericRecord] { // Flink needs the serializer
> to be serializable => this "@transient lazy val" does the trick @transient
> lazy val valueDeserializer = { val deserializer = new
> KafkaAvroDeserializer(new CachedSchemaRegistryClient(schemaRegistryUrl,
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT))
> deserializer.configure(
> Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG ->
> schemaRegistryUrl, KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG
> -> false).asJava, false) deserializer } override def
> isEndOfStream(nextElement: GenericRecord): Boolean = false override def
> deserialize(messageKey: Array[Byte], message: Array[Byte], topic: String,
> partition: Int, offset: Long): GenericRecord = { // val key =
> keyDeserializer.deserialize(topic, messageKey).asInstanceOf[String] val
> value = valueDeserializer.deserialize(topic,
> message).asInstanceOf[GenericRecord] value } override def getProducedType:
> TypeInformation[GenericRecord] =
> TypeExtractor.getForClass(classOf[GenericRecord]) }
>
> I have no clue how to go about solving this. I saw a lot of people trying
> to implement the same. If someone can guide me, it’d be really helpful.
>
> Thanks!
> Nitish
>