使用Sql Cli提交per job,如何设置Yarn的ApplicationName

2021-11-30 Thread wangzy24
Hi

   使用Sql Cli提交per job,Yarn上默认的Application Name为 Flink per-job
cluster,如何设置这个名称?





Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread 刘建刚
Good work for flink's batch processing!
Remote shuffle service can resolve the container lost problem and reduce
the running time for batch jobs once failover. We have investigated the
component a lot and welcome Flink's native solution. We will try it and
help improve it.

Thanks,
Liu Jiangang

Yingjie Cao  于2021年11月30日周二 下午9:33写道:

> Hi dev & users,
>
> We are happy to announce the open source of remote shuffle project [1] for
> Flink. The project is originated in Alibaba and the main motivation is to
> improve batch data processing for both performance & stability and further
> embrace cloud native. For more features about the project, please refer to
> [1].
>
> Before going open source, the project has been used widely in production
> and it behaves well on both stability and performance. We hope you enjoy
> it. Collaborations and feedbacks are highly appreciated.
>
> Best,
> Yingjie on behalf of all contributors
>
> [1] https://github.com/flink-extended/flink-remote-shuffle
>


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread 刘建刚
Good work for flink's batch processing!
Remote shuffle service can resolve the container lost problem and reduce
the running time for batch jobs once failover. We have investigated the
component a lot and welcome Flink's native solution. We will try it and
help improve it.

Thanks,
Liu Jiangang

Yingjie Cao  于2021年11月30日周二 下午9:33写道:

> Hi dev & users,
>
> We are happy to announce the open source of remote shuffle project [1] for
> Flink. The project is originated in Alibaba and the main motivation is to
> improve batch data processing for both performance & stability and further
> embrace cloud native. For more features about the project, please refer to
> [1].
>
> Before going open source, the project has been used widely in production
> and it behaves well on both stability and performance. We hope you enjoy
> it. Collaborations and feedbacks are highly appreciated.
>
> Best,
> Yingjie on behalf of all contributors
>
> [1] https://github.com/flink-extended/flink-remote-shuffle
>


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Yun Gao
Very thanks for all the warm responses ! We are greatly welcome more use cases 
and co-work on Flink Remote Shuffle and bash processing with Flink~

Best,
Yun


--
From:Yingjie Cao 
Send Time:2021 Dec. 1 (Wed.) 11:16
To:dev 
Subject:Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch 
data processing

Hi Jing,

Great to hear that, collaborations and feedbacks are welcomed.

Best,
Yingjie

Jing Zhang  于2021年12月1日周三 上午10:34写道:

> Amazing!
> Remote shuffle service is an important improvement for batch data
> processing experience on Flink.
> It is also a strong requirement in our internal batch business, we would
> try it soon and give you feedback.
>
> Best,
> Jing Zhang
>
> Martijn Visser  于2021年12月1日周三 上午3:25写道:
>
> > Hi Yingjie,
> >
> > This is great, thanks for sharing. Will you also add it to
> > https://flink-packages.org/ ?
> >
> > Best regards,
> >
> > Martijn
> >
> > On Tue, 30 Nov 2021 at 17:31, Till Rohrmann 
> wrote:
> >
> > > Great news, Yingjie. Thanks a lot for sharing this information with the
> > > community and kudos to all the contributors of the external shuffle
> > service
> > > :-)
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao 
> > > wrote:
> > >
> > > > Hi dev & users,
> > > >
> > > > We are happy to announce the open source of remote shuffle project
> [1]
> > > for
> > > > Flink. The project is originated in Alibaba and the main motivation
> is
> > to
> > > > improve batch data processing for both performance & stability and
> > > further
> > > > embrace cloud native. For more features about the project, please
> refer
> > > to
> > > > [1].
> > > >
> > > > Before going open source, the project has been used widely in
> > production
> > > > and it behaves well on both stability and performance. We hope you
> > enjoy
> > > > it. Collaborations and feedbacks are highly appreciated.
> > > >
> > > > Best,
> > > > Yingjie on behalf of all contributors
> > > >
> > > > [1] https://github.com/flink-extended/flink-remote-shuffle
> > > >
> > >
> >
>



Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Yun Gao
Very thanks for all the warm responses ! We are greatly welcome more use cases 
and co-work on Flink Remote Shuffle and bash processing with Flink~

Best,
Yun


--
From:Yingjie Cao 
Send Time:2021 Dec. 1 (Wed.) 11:16
To:dev 
Subject:Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch 
data processing

Hi Jing,

Great to hear that, collaborations and feedbacks are welcomed.

Best,
Yingjie

Jing Zhang  于2021年12月1日周三 上午10:34写道:

> Amazing!
> Remote shuffle service is an important improvement for batch data
> processing experience on Flink.
> It is also a strong requirement in our internal batch business, we would
> try it soon and give you feedback.
>
> Best,
> Jing Zhang
>
> Martijn Visser  于2021年12月1日周三 上午3:25写道:
>
> > Hi Yingjie,
> >
> > This is great, thanks for sharing. Will you also add it to
> > https://flink-packages.org/ ?
> >
> > Best regards,
> >
> > Martijn
> >
> > On Tue, 30 Nov 2021 at 17:31, Till Rohrmann 
> wrote:
> >
> > > Great news, Yingjie. Thanks a lot for sharing this information with the
> > > community and kudos to all the contributors of the external shuffle
> > service
> > > :-)
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao 
> > > wrote:
> > >
> > > > Hi dev & users,
> > > >
> > > > We are happy to announce the open source of remote shuffle project
> [1]
> > > for
> > > > Flink. The project is originated in Alibaba and the main motivation
> is
> > to
> > > > improve batch data processing for both performance & stability and
> > > further
> > > > embrace cloud native. For more features about the project, please
> refer
> > > to
> > > > [1].
> > > >
> > > > Before going open source, the project has been used widely in
> > production
> > > > and it behaves well on both stability and performance. We hope you
> > enjoy
> > > > it. Collaborations and feedbacks are highly appreciated.
> > > >
> > > > Best,
> > > > Yingjie on behalf of all contributors
> > > >
> > > > [1] https://github.com/flink-extended/flink-remote-shuffle
> > > >
> > >
> >
>



Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-30 Thread Hang Ruan
Hi,

In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint
is open is the default behavior in KafkaSourceBuilder. And it can not be
changed in KafkaSourceBuilder.

By this FLINK-24277 , we
could change the behavior. This problem will be fixed in 1.12.6. It seems
not to be contained in your version.

Reading the RP will be helpful for you to understand the behavior.


Marco Villalobos  于2021年12月1日周三 上午3:43写道:

> Thanks!
>
> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
> does not exist in Flink 1.12.
>
> Is that property supported with the string "commit.offsets.on.checkpoints"?
>
> How do I configure that behavior so that offsets get committed on
> checkpoints in Flink 1.12 when using the KafkaSourceBuilder? Or is that the
> default behavior with checkpoints?
>
>
>
>
> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan  wrote:
>
>> Hi,
>>
>> Maybe you can write like this :
>> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
>> "true");
>>
>> Other additional properties could be found here :
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>>
>> Marco Villalobos  于2021年11月30日周二 上午11:08写道:
>>
>>> Thank you for the information.  That still does not answer my question
>>> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
>>> that consumer should commit offsets back to Kafka on checkpoints?
>>>
>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this
>>> method.
>>>
>>> But now that I am using KafkaSourceBuilder, how do I configure that
>>> behavior so that offsets get committed on checkpoints?  Or is that the
>>> default behavior with checkpoints?
>>>
>>> -Marco
>>>
>>> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng 
>>> wrote:
>>>
 Hi!

 Flink 1.14 release note states about this. See [1].

 [1]
 https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer

 Marco Villalobos  于2021年11月30日周二 上午7:12写道:

> Hi everybody,
>
> I am using Flink 1.12 and migrating my code from using
> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>
> FlinkKafkaConsumer has the method
>
> /**
>>  * Specifies whether or not the consumer should commit offsets back
>> to Kafka on checkpoints.
>>  * This setting will only have effect if checkpointing is enabled for
>> the job. If checkpointing isn't
>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>> "enable.auto.commit" (for 0.9+) property
>>  * settings will be used.
>> */
>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>
>
> How do I setup that parameter when using the KafkaSourceBuilder? If I
> already have checkpointing configured, is it necessary to setup "commit
> offsets on checkpoints"?
>
> The Flink 1.12 documentation does not discuss this topic, and the
> Flink 1.14 documentation says little about it.
>
>  For example, the Flink 1.14 documentation states:
>
> Additional Properties
>> In addition to properties described above, you can set arbitrary
>> properties for KafkaSource and KafkaConsumer by using
>> setProperties(Properties) and setProperty(String, String). KafkaSource 
>> has
>> following options for configuration:
>> commit.offsets.on.checkpoint specifies whether to commit consuming
>> offsets to Kafka brokers on checkpoint
>
>
> And the 1.12 documentation states:
>
> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>> consume records from a topic and periodically checkpoint all its Kafka
>> offsets, together with the state of other operations. In case of a job
>> failure, Flink will restore the streaming program to the state of the
>> latest checkpoint and re-consume the records from Kafka, starting from 
>> the
>> offsets that were stored in the checkpoint.
>> The interval of drawing checkpoints therefore defines how much the
>> program may have to go back at most, in case of a failure. To use fault
>> tolerant Kafka Consumers, checkpointing of the topology needs to be 
>> enabled
>> in the job.
>> If checkpointing is disabled, the Kafka consumer will periodically
>> commit the offsets to Zookeeper.
>
>
> Thank you.
>
> Marco
>
>
>


Re: Does Flink ever delete any sink S3 files?

2021-11-30 Thread Yun Gao
Hi Dan,

The file sink would first write records to temporary files,
namely .part-*, and commit them on checkpoint succeeding
by renaming them to formal files, namely part-*.

Best,
Yun


--
From:Dan Hill 
Send Time:2021 Dec. 1 (Wed.) 13:51
To:user 
Subject:Does Flink ever delete any sink S3 files?

Hi.

I'm debugging an issue with a system that listens for files written to S3.  I'm 
assuming Flink does not modify sink objects after they've been written.  I've 
seen a minicluster test locally write a ".part-" file.  I wanted to double 
check to make sure S3 sinks won't write and delete files.

```FileSink.forBulkFormat(new Path(...), factory)
.withBucketAssigner(new DateHourBucketAssigner<...>(...))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build())
.uid("sink-s3")
.name("S3 sink");
```




Does Flink ever delete any sink S3 files?

2021-11-30 Thread Dan Hill
Hi.

I'm debugging an issue with a system that listens for files written to S3.
I'm assuming Flink does not modify sink objects after they've been
written.  I've seen a minicluster test locally write a ".part-" file.  I
wanted to double check to make sure S3 sinks won't write and delete files.

```
FileSink.forBulkFormat(new Path(...), factory)
.withBucketAssigner(new DateHourBucketAssigner<...>(...))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build())
.uid("sink-s3")
.name("S3 sink");
```


Re: Flink sql jdbc Partitioned Scan timestamp不生效

2021-11-30 Thread 天下五帝东
Hi:
我设置的是long的数值,但是不生效



> 2021年12月1日 上午10:33,Caizhi Weng  写道:
> 
> Hi!
> 
> scan.partition.lower-bound 和 scan.partition.upper-bound 都是一个 long 值(而不是一个
> timestamp 字符串的形式)。它们将会转换成 where  between  and
>  的 SQL 语句通过 jdbc 获取数据。可以检查一下配置项的格式和值的范围是否符合期望。
> 
> 天下五帝东  于2021年12月1日周三 上午9:23写道:
> 
>> Hi:
>>   我在使用flink sql jdbc connector测试partitioned scan功能,发现指定
>> scan.partition.column 为timestamp类型时,scan.partition.lower-bound
>> 
>> 和scan.partition.upper-bound指定具体的值后,没有读取到相关数据,哪位大佬帮忙解答下
>> 
>> 谢谢
>> 
>> 



Re:Re: flink sql group by后收集数据问题

2021-11-30 Thread casel.chen
我想要的是一个通用的收集ROW类型集合(ARRAY去重和不去重),不是只针对特定ROW
@DataTypeHint("ROW") 这样写没有问题@DataTypeHint("ROW") 
这样写会报错

在 2021-12-01 11:12:27,"Caizhi Weng"  写道:
>Hi!
>
>UDF 支持 ROW 类型,详见 [1] 中关于 ROW 的示例。
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%e8%87%aa%e5%8a%a8%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc
>
>casel.chen  于2021年12月1日周三 上午7:56写道:
>
>> 业务中使用flink sql group by操作后想收集所有的数据,如下示例:
>>
>>
>> kafka源表:
>> 班级 学号  姓名  年龄
>> 1 20001张三   15
>> 2 20011李四   16
>> 1 20002王五   16
>> 2 20012吴六   15
>>
>>
>> create table source_table (
>>class_no: INT,
>>student_no: INT,
>>name: STRING,
>>age: INT
>> ) with (
>>'connector' = 'kafka',
>>...
>> );
>>
>>
>> mongodb目标表:
>> 班级 学生信息
>> 1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no":
>> 20002, "name":"王五", "age": 16}]
>> 2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no":
>> 20012, "name":"吴六", "age": 15}]
>>
>>
>> create table sink_table (
>>   class_no INT,
>>   students: ARRAY>
>> ) with (
>>   'connector' = 'mongodb',
>>   ...
>> );
>>
>>
>> 查了下flink自带的系统函数,接近满足条件的只有collect函数。
>> insert into sink_table select class_no, collect(ROW(student_no, name, age)
>> from source_table group by class_no;
>>
>>
>> 但它返回的是Multiset类型,即Map> Integer>。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。
>> 何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。
>>
>>
>> 1.
>> 如果要收集去重的Array[ROW],有什么办法可以做到吗?我曾尝试写UDF,但UDF不支持ROW类型,只支持具体的数据类型,有何建议或参考例子?
>> 2. 如果要收集不去重的Array[ROW],又该怎么写?
>> 3. 访问一个数据类型为Map的数据中key和value,分别要用什么flink sql语法?
>>
>>
>> 谢谢解答!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>


Re:Re: flink sql中如何使用异步io关联维表?

2021-11-30 Thread Michael Ran
Hello,咨询一下,目前connector-hbase 的异步join,是能保证顺序的吗?
在 2021-03-05 11:10:41,"Leonard Xu"  写道:
>目前Flink SQL 中的connector都没实现异步io关联维表,接口是上已经支持了的,如果是自己实现可以参考[1]
>另外,HBase connector 社区有人正在支持异步io关联维表,预计1.13可以使用[2]
>
>祝好
>
>[1]https://github.com/apache/flink/blob/73cdd3d0d9f6a807b3e47c09eef7983c9aa180c7/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java#L618
>[2] https://github.com/apache/flink/pull/14684#pullrequestreview-604148209 
>
>
>
>
>> 在 2021年3月4日,14:22,HunterXHunter <1356469...@qq.com> 写道:
>> 
>> 定义一个 sourcetable
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: REST API for detached minicluster based integration test

2021-11-30 Thread Caizhi Weng
Hi!

I see. So to test your watermark strategy you would like to fetch the
watermarks downstream.

I would suggest taking a look at
org.apache.flink.streaming.api.operators.AbstractStreamOperator. This class
has a processWatermark method, which is called when a watermark flows
through this operator. You can make your own testing operator by extending
this class and stuff the testing operator in a
org.apache.flink.streaming.api.transformations.OneInputTransformation. In
this case you do not need to fetch watermarks from the metrics. If
processWatermark is never called then it means no watermark ever comes and
you might want to check your watermark strategy implementation.

Jin Yi  于2021年12月1日周三 上午4:14写道:

> thanks for the reply caizhi!
>
> we're on flink 1.12.3.  in the test, i'm using a custom watermark strategy
> that is derived from BoundedOutOfOrdernessWatermarks that emits watermarks
> using processing time after a period of no events to keep the timer-reliant
> operators happy.  basically, it's using event time for everything, but the
> inputs have watermarks periodically output if there's no events coming in
> through them.
>
> we started w/ test data w/ their own event times in the tests and simply
> used the SEE.fromCollection with a timestamp assigner that extracts the
> timestamp from the test event data.  however, doing things this way, the
> minicluster during the test terminates (and completes the test) once all
> the input is processed, even though there are timers in the operators that
> are meant to supply additional output still outstanding.  so, that's why i
> cobbled together an attempt to control when the input sources are complete
> by using the posted WaitingSourceFunction to send the signal to
> close/cancel the input stream based on some form of state checking on the
> job (which is where this thread starts).
>
> what's the best way to achieve what i need?  i would love to set the
> inputs up so that watermarks get emitted appropriately throughout the
> processing of the test data as well as for a defined period after all the
> "input" is complete such that the timer-reliant operators get their timers
> triggered.  thanks!
>
> On Mon, Nov 29, 2021 at 6:37 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> I believe metrics are enabled by default even for a mini cluster. Which
>> Flink version are you using and how do you set your watermark strategy?
>> Could you share your user code about how to create the datastream / SQL and
>> get the job graph?
>>
>> I'm also curious about why do you need to extract the output watermarks
>> just for stopping the source. You can control the records and the watermark
>> strategy from the source. From my point of view, constructing some test
>> data with some specific row time would be enough.
>>
>> Jin Yi  于2021年11月30日周二 上午3:34写道:
>>
>>> bump.  a more general question is what do people do for more end to end,
>>> full integration tests to test event time based jobs with timers?
>>>
>>> On Tue, Nov 23, 2021 at 11:26 AM Jin Yi  wrote:
>>>
 i am writing an integration test where i execute a streaming flink job
 using faked, "unbounded" input where i want to control when the source
 function(s) complete by triggering them once the job's operator's maximum
 output watermarks are beyond some job completion watermark that's relative
 to the maximum input timestamp because the flink job uses event time timers
 to produce some output.

 here is the faked, "unbounded" source function class:

   private static class WaitingSourceFunction extends
 FromElementsFunction {

 private boolean isWaiting;

 private TypeInformation typeInfo;


 private WaitingSourceFunction(

 StreamExecutionEnvironment env, Collection data,
 TypeInformation typeInfo)

 throws IOException {

   super(typeInfo.createSerializer(env.getConfig()), data);

   this.isWaiting = true;

   this.typeInfo = typeInfo;

 }


 @Override

 public void cancel() {

   super.cancel();

   isWaiting = false;

 }


 @Override

 public void run(SourceContext ctx) throws Exception {

   super.run(ctx);

   while (isWaiting) {

 TimeUnit.SECONDS.sleep(10);

   }

 }


 public long getEndWatermark() {

   // *TODO*

   return 100;

 }

   }

 and here is function where i want to busy wait (currently hacked up to
 print info to show my problem):

   private void waitForDone(String jobName, WaitingSourceFunction...
 functions)

   throws ConfigurationException, Exception, ExecutionException,
 IOException, InterruptedException {

 JobExecutionResult jobResult = env.execute(jobName);


Re: flink sql group by后收集数据问题

2021-11-30 Thread Caizhi Weng
Hi!

UDF 支持 ROW 类型,详见 [1] 中关于 ROW 的示例。

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%e8%87%aa%e5%8a%a8%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc

casel.chen  于2021年12月1日周三 上午7:56写道:

> 业务中使用flink sql group by操作后想收集所有的数据,如下示例:
>
>
> kafka源表:
> 班级 学号  姓名  年龄
> 1 20001张三   15
> 2 20011李四   16
> 1 20002王五   16
> 2 20012吴六   15
>
>
> create table source_table (
>class_no: INT,
>student_no: INT,
>name: STRING,
>age: INT
> ) with (
>'connector' = 'kafka',
>...
> );
>
>
> mongodb目标表:
> 班级 学生信息
> 1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no":
> 20002, "name":"王五", "age": 16}]
> 2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no":
> 20012, "name":"吴六", "age": 15}]
>
>
> create table sink_table (
>   class_no INT,
>   students: ARRAY>
> ) with (
>   'connector' = 'mongodb',
>   ...
> );
>
>
> 查了下flink自带的系统函数,接近满足条件的只有collect函数。
> insert into sink_table select class_no, collect(ROW(student_no, name, age)
> from source_table group by class_no;
>
>
> 但它返回的是Multiset类型,即Map Integer>。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。
> 何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。
>
>
> 1.
> 如果要收集去重的Array[ROW],有什么办法可以做到吗?我曾尝试写UDF,但UDF不支持ROW类型,只支持具体的数据类型,有何建议或参考例子?
> 2. 如果要收集不去重的Array[ROW],又该怎么写?
> 3. 访问一个数据类型为Map的数据中key和value,分别要用什么flink sql语法?
>
>
> 谢谢解答!
>
>
>
>
>
>
>
>
>
>
>
>


Re: flink sql ROW()语句中是不是不能再使用case when?

2021-11-30 Thread Caizhi Weng
Hi!

目前 ROW 的构造不支持内部调用函数,建议先在前面 create view 把需要的值计算好。

casel.chen  于2021年12月1日周三 上午7:59写道:

>
>
> select ROW(field1, field2, case when field3 = 'xxx' then 'T' else 'F' as
> field3) from source_table
>
>
> 这样的语句语法检查会通不过。


Re: flink sql太多insert into语句问题

2021-11-30 Thread Caizhi Weng
Hi!

感谢提出问题。方案一应该是最合适的,“算子名称长度超过限制而失败”不是期望行为,具体是什么样的错误栈?

casel.chen  于2021年12月1日周三 上午8:10写道:

> 我们有一个场景需要从一张kafka源表写很多不同告警规则到同一张告警目标表。规则数量大概有300多个,采用纯flink sql实现。
>
>
> 方案一是采用创建视图,将不同规则union all起来,再将视图插入目标表,发现算子都chain成了一个,因为flink
> sql算子的名称是flink sql内容,所以算子名称长度超过限制而失败。因而转向方案二
> 方案二是一条规则对应一条insert into语句,生成graph图会发现fan
> out特别多。这次没有算子名称超长问题,但作业起动会特别慢。考虑到后续规则还会进行修改,添加或删除。这样慢启动无法接受。
>
>
> 想问一下,这种场景最适合的做法是什么?谢谢!


Re: flink访问多个oss bucket问题

2021-11-30 Thread Caizhi Weng
Hi!

如果只是 bucket 不同的话,通过在 with 参数里指定 path 即可。

如果连 ak id 和 secret
都不同,可以考虑实现自己的 com.aliyun.oss.common.auth.CredentialsProvider 接口,并在 flink
conf 中指定 fs.oss.credentials.provider 为对应的实现类。

casel.chen  于2021年12月1日周三 上午8:14写道:

> flink平台作业写数据到客户oss bucket,和flink平台作业本身做checkpoint/savepoint用的oss
> bucket不是同一个。
> 请问这种场景flink是否支持,如果支持的话应该要怎么配置?谢谢!


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Jingsong Li
Amazing!

Thanks Yingjie and all contributors for your great work.

Best,
Jingsong

On Wed, Dec 1, 2021 at 10:52 AM Yun Tang  wrote:
>
> Great news!
> Thanks for all the guys who contributed in this project.
>
> Best
> Yun Tang
>
> On 2021/11/30 16:30:52 Till Rohrmann wrote:
> > Great news, Yingjie. Thanks a lot for sharing this information with the
> > community and kudos to all the contributors of the external shuffle service
> > :-)
> >
> > Cheers,
> > Till
> >
> > On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao  wrote:
> >
> > > Hi dev & users,
> > >
> > > We are happy to announce the open source of remote shuffle project [1] for
> > > Flink. The project is originated in Alibaba and the main motivation is to
> > > improve batch data processing for both performance & stability and further
> > > embrace cloud native. For more features about the project, please refer to
> > > [1].
> > >
> > > Before going open source, the project has been used widely in production
> > > and it behaves well on both stability and performance. We hope you enjoy
> > > it. Collaborations and feedbacks are highly appreciated.
> > >
> > > Best,
> > > Yingjie on behalf of all contributors
> > >
> > > [1] https://github.com/flink-extended/flink-remote-shuffle
> > >
> >



-- 
Best, Jingsong Lee


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Jingsong Li
Amazing!

Thanks Yingjie and all contributors for your great work.

Best,
Jingsong

On Wed, Dec 1, 2021 at 10:52 AM Yun Tang  wrote:
>
> Great news!
> Thanks for all the guys who contributed in this project.
>
> Best
> Yun Tang
>
> On 2021/11/30 16:30:52 Till Rohrmann wrote:
> > Great news, Yingjie. Thanks a lot for sharing this information with the
> > community and kudos to all the contributors of the external shuffle service
> > :-)
> >
> > Cheers,
> > Till
> >
> > On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao  wrote:
> >
> > > Hi dev & users,
> > >
> > > We are happy to announce the open source of remote shuffle project [1] for
> > > Flink. The project is originated in Alibaba and the main motivation is to
> > > improve batch data processing for both performance & stability and further
> > > embrace cloud native. For more features about the project, please refer to
> > > [1].
> > >
> > > Before going open source, the project has been used widely in production
> > > and it behaves well on both stability and performance. We hope you enjoy
> > > it. Collaborations and feedbacks are highly appreciated.
> > >
> > > Best,
> > > Yingjie on behalf of all contributors
> > >
> > > [1] https://github.com/flink-extended/flink-remote-shuffle
> > >
> >



-- 
Best, Jingsong Lee


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Yun Tang
Great news!
Thanks for all the guys who contributed in this project.

Best
Yun Tang

On 2021/11/30 16:30:52 Till Rohrmann wrote:
> Great news, Yingjie. Thanks a lot for sharing this information with the
> community and kudos to all the contributors of the external shuffle service
> :-)
> 
> Cheers,
> Till
> 
> On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao  wrote:
> 
> > Hi dev & users,
> >
> > We are happy to announce the open source of remote shuffle project [1] for
> > Flink. The project is originated in Alibaba and the main motivation is to
> > improve batch data processing for both performance & stability and further
> > embrace cloud native. For more features about the project, please refer to
> > [1].
> >
> > Before going open source, the project has been used widely in production
> > and it behaves well on both stability and performance. We hope you enjoy
> > it. Collaborations and feedbacks are highly appreciated.
> >
> > Best,
> > Yingjie on behalf of all contributors
> >
> > [1] https://github.com/flink-extended/flink-remote-shuffle
> >
> 


flink sql支持state TTL的细粒度局部配置

2021-11-30 Thread gygz...@163.com
Hi all

在我们生产中发现,如果在sql中配置状态的TTL会导致这个 ttl时间全局生效

如果我存在一个如下sql

select count(1),region from (select * from A join B on a.uid = b.uid)  group by 
region

如果我配置一个全局的TTL会导致count这个GroupAggFunction的状态被淘汰掉,比如说一天以后累计就被清零

如果不配置,又会导致Regular join的状态增大

这是其中一个场景,这里只是举一个例子

主要是想表达针对 Sql中需要配置局部State的ttl时间,这种场景应该如何去做 ?



gygz...@163.com

 
  


Re: Flink sql jdbc Partitioned Scan timestamp不生效

2021-11-30 Thread Caizhi Weng
Hi!

scan.partition.lower-bound 和 scan.partition.upper-bound 都是一个 long 值(而不是一个
timestamp 字符串的形式)。它们将会转换成 where  between  and
 的 SQL 语句通过 jdbc 获取数据。可以检查一下配置项的格式和值的范围是否符合期望。

天下五帝东  于2021年12月1日周三 上午9:23写道:

> Hi:
>我在使用flink sql jdbc connector测试partitioned scan功能,发现指定
> scan.partition.column 为timestamp类型时,scan.partition.lower-bound
>
> 和scan.partition.upper-bound指定具体的值后,没有读取到相关数据,哪位大佬帮忙解答下
>
> 谢谢
>
>


Flink1.9 内存模型中cutoff内存分配问题

2021-11-30 Thread mayifan
Hi,All~!




基于Flink1.9做内存调优,已知cutoff在1.9中提供给所有未被 Flink 计算在内的内存开销使用,如 RocksDB、JVM 内部开销等。




cutoff内存分配系数默认是0.25,如果在TaskManager中统一使用这个默认系数的话,显然在一些场景下会造成一定的内存浪费。




如果需要提升这部分内存的利用率,针对不同场景,cutoff内存大小的分配有什么比较好的建议吗?




如RocksDB状态后端?FS状态后端?TaskManager中Slots数的不同?




感谢~!

|

|

mayifan

|
|

mayi...@88.com

|

签名由网易邮箱大师定制

Flink sql jdbc Partitioned Scan timestamp不生效

2021-11-30 Thread 天下五帝东
Hi:
   我在使用flink sql jdbc connector测试partitioned scan功能,发现指定
scan.partition.column 为timestamp类型时,scan.partition.lower-bound

和scan.partition.upper-bound指定具体的值后,没有读取到相关数据,哪位大佬帮忙解答下

谢谢

  

flink访问多个oss bucket问题

2021-11-30 Thread casel.chen
flink平台作业写数据到客户oss bucket,和flink平台作业本身做checkpoint/savepoint用的oss bucket不是同一个。
请问这种场景flink是否支持,如果支持的话应该要怎么配置?谢谢!

flink sql太多insert into语句问题

2021-11-30 Thread casel.chen
我们有一个场景需要从一张kafka源表写很多不同告警规则到同一张告警目标表。规则数量大概有300多个,采用纯flink sql实现。


方案一是采用创建视图,将不同规则union all起来,再将视图插入目标表,发现算子都chain成了一个,因为flink sql算子的名称是flink 
sql内容,所以算子名称长度超过限制而失败。因而转向方案二
方案二是一条规则对应一条insert into语句,生成graph图会发现fan 
out特别多。这次没有算子名称超长问题,但作业起动会特别慢。考虑到后续规则还会进行修改,添加或删除。这样慢启动无法接受。


想问一下,这种场景最适合的做法是什么?谢谢!

flink sql ROW()语句中是不是不能再使用case when?

2021-11-30 Thread casel.chen


select ROW(field1, field2, case when field3 = 'xxx' then 'T' else 'F' as 
field3) from source_table 


这样的语句语法检查会通不过。

flink sql group by后收集数据问题

2021-11-30 Thread casel.chen
业务中使用flink sql group by操作后想收集所有的数据,如下示例:


kafka源表:
班级 学号  姓名  年龄
1 20001张三   15
2 20011李四   16
1 20002王五   16
2 20012吴六   15


create table source_table (
   class_no: INT,
   student_no: INT,
   name: STRING,
   age: INT
) with (
   'connector' = 'kafka',
   ...
);


mongodb目标表:
班级 学生信息
1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no": 20002, 
"name":"王五", "age": 16}]
2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no": 20012, 
"name":"吴六", "age": 15}]


create table sink_table (
  class_no INT,
  students: ARRAY>
) with (
  'connector' = 'mongodb',
  ...
);


查了下flink自带的系统函数,接近满足条件的只有collect函数。
insert into sink_table select class_no, collect(ROW(student_no, name, age) from 
source_table group by class_no;


但它返回的是Multiset类型,即Map。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。
何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。


1. 如果要收集去重的Array[ROW],有什么办法可以做到吗?我曾尝试写UDF,但UDF不支持ROW类型,只支持具体的数据类型,有何建议或参考例子?
2. 如果要收集不去重的Array[ROW],又该怎么写?
3. 访问一个数据类型为Map的数据中key和value,分别要用什么flink sql语法?


谢谢解答!













Re: REST API for detached minicluster based integration test

2021-11-30 Thread Jin Yi
thanks for the reply caizhi!

we're on flink 1.12.3.  in the test, i'm using a custom watermark strategy
that is derived from BoundedOutOfOrdernessWatermarks that emits watermarks
using processing time after a period of no events to keep the timer-reliant
operators happy.  basically, it's using event time for everything, but the
inputs have watermarks periodically output if there's no events coming in
through them.

we started w/ test data w/ their own event times in the tests and simply
used the SEE.fromCollection with a timestamp assigner that extracts the
timestamp from the test event data.  however, doing things this way, the
minicluster during the test terminates (and completes the test) once all
the input is processed, even though there are timers in the operators that
are meant to supply additional output still outstanding.  so, that's why i
cobbled together an attempt to control when the input sources are complete
by using the posted WaitingSourceFunction to send the signal to
close/cancel the input stream based on some form of state checking on the
job (which is where this thread starts).

what's the best way to achieve what i need?  i would love to set the inputs
up so that watermarks get emitted appropriately throughout the processing
of the test data as well as for a defined period after all the "input" is
complete such that the timer-reliant operators get their timers triggered.
thanks!

On Mon, Nov 29, 2021 at 6:37 PM Caizhi Weng  wrote:

> Hi!
>
> I believe metrics are enabled by default even for a mini cluster. Which
> Flink version are you using and how do you set your watermark strategy?
> Could you share your user code about how to create the datastream / SQL and
> get the job graph?
>
> I'm also curious about why do you need to extract the output watermarks
> just for stopping the source. You can control the records and the watermark
> strategy from the source. From my point of view, constructing some test
> data with some specific row time would be enough.
>
> Jin Yi  于2021年11月30日周二 上午3:34写道:
>
>> bump.  a more general question is what do people do for more end to end,
>> full integration tests to test event time based jobs with timers?
>>
>> On Tue, Nov 23, 2021 at 11:26 AM Jin Yi  wrote:
>>
>>> i am writing an integration test where i execute a streaming flink job
>>> using faked, "unbounded" input where i want to control when the source
>>> function(s) complete by triggering them once the job's operator's maximum
>>> output watermarks are beyond some job completion watermark that's relative
>>> to the maximum input timestamp because the flink job uses event time timers
>>> to produce some output.
>>>
>>> here is the faked, "unbounded" source function class:
>>>
>>>   private static class WaitingSourceFunction extends
>>> FromElementsFunction {
>>>
>>> private boolean isWaiting;
>>>
>>> private TypeInformation typeInfo;
>>>
>>>
>>> private WaitingSourceFunction(
>>>
>>> StreamExecutionEnvironment env, Collection data,
>>> TypeInformation typeInfo)
>>>
>>> throws IOException {
>>>
>>>   super(typeInfo.createSerializer(env.getConfig()), data);
>>>
>>>   this.isWaiting = true;
>>>
>>>   this.typeInfo = typeInfo;
>>>
>>> }
>>>
>>>
>>> @Override
>>>
>>> public void cancel() {
>>>
>>>   super.cancel();
>>>
>>>   isWaiting = false;
>>>
>>> }
>>>
>>>
>>> @Override
>>>
>>> public void run(SourceContext ctx) throws Exception {
>>>
>>>   super.run(ctx);
>>>
>>>   while (isWaiting) {
>>>
>>> TimeUnit.SECONDS.sleep(10);
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>> public long getEndWatermark() {
>>>
>>>   // *TODO*
>>>
>>>   return 100;
>>>
>>> }
>>>
>>>   }
>>>
>>> and here is function where i want to busy wait (currently hacked up to
>>> print info to show my problem):
>>>
>>>   private void waitForDone(String jobName, WaitingSourceFunction...
>>> functions)
>>>
>>>   throws ConfigurationException, Exception, ExecutionException,
>>> IOException, InterruptedException {
>>>
>>> JobExecutionResult jobResult = env.execute(jobName);
>>>
>>> RestClient restClient = new RestClient(
>>>
>>> RestClientConfiguration.fromConfiguration(getClientConfiguration()),
>>> scheduledExecutorService);
>>>
>>> URI restUri = MiniClusterExtension.flinkCluster.getRestAddres();
>>>
>>>
>>> System.out.printf("** JOB: %s %s\n", jobName, jobResult.getJobID());
>>>
>>>
>>> long currentWatermark = 0;
>>>
>>> long lastInputWatermark = Arrays.stream(functions)
>>>
>>>   .map(f -> f.getEndWatermark())
>>>
>>>   .mapToLong(l -> l)
>>>
>>>   .max().getAsLong();
>>>
>>> for (int i = 0; i < 3 ; i++) {
>>>
>>> //while (currentWatermark < (lastInputWatermark + 1000)) {
>>>
>>>   JobDetailsHeaders getVertexHeaders =
>>> JobDetailsHeaders.getInstance();
>>>
>>>   JobMessageParameters getVertexParams =
>>> getVertexHeaders.getUnresolvedMessageParameters();
>>>
>>>   

Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-30 Thread Marco Villalobos
Thanks!

However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
does not exist in Flink 1.12.

Is that property supported with the string "commit.offsets.on.checkpoints"?

How do I configure that behavior so that offsets get committed on
checkpoints in Flink 1.12 when using the KafkaSourceBuilder? Or is that the
default behavior with checkpoints?




On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan  wrote:

> Hi,
>
> Maybe you can write like this :
> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
> "true");
>
> Other additional properties could be found here :
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>
> Marco Villalobos  于2021年11月30日周二 上午11:08写道:
>
>> Thank you for the information.  That still does not answer my question
>> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
>> that consumer should commit offsets back to Kafka on checkpoints?
>>
>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this
>> method.
>>
>> But now that I am using KafkaSourceBuilder, how do I configure that
>> behavior so that offsets get committed on checkpoints?  Or is that the
>> default behavior with checkpoints?
>>
>> -Marco
>>
>> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng  wrote:
>>
>>> Hi!
>>>
>>> Flink 1.14 release note states about this. See [1].
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>>>
>>> Marco Villalobos  于2021年11月30日周二 上午7:12写道:
>>>
 Hi everybody,

 I am using Flink 1.12 and migrating my code from using
 FlinkKafkaConsumer to using the KafkaSourceBuilder.

 FlinkKafkaConsumer has the method

 /**
>  * Specifies whether or not the consumer should commit offsets back to
> Kafka on checkpoints.
>  * This setting will only have effect if checkpointing is enabled for
> the job. If checkpointing isn't
>  * enabled, only the "auto.commit.enable" (for 0.8) /
> "enable.auto.commit" (for 0.9+) property
>  * settings will be used.
> */
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)


 How do I setup that parameter when using the KafkaSourceBuilder? If I
 already have checkpointing configured, is it necessary to setup "commit
 offsets on checkpoints"?

 The Flink 1.12 documentation does not discuss this topic, and the Flink
 1.14 documentation says little about it.

  For example, the Flink 1.14 documentation states:

 Additional Properties
> In addition to properties described above, you can set arbitrary
> properties for KafkaSource and KafkaConsumer by using
> setProperties(Properties) and setProperty(String, String). KafkaSource has
> following options for configuration:
> commit.offsets.on.checkpoint specifies whether to commit consuming
> offsets to Kafka brokers on checkpoint


 And the 1.12 documentation states:

 With Flink’s checkpointing enabled, the Flink Kafka Consumer will
> consume records from a topic and periodically checkpoint all its Kafka
> offsets, together with the state of other operations. In case of a job
> failure, Flink will restore the streaming program to the state of the
> latest checkpoint and re-consume the records from Kafka, starting from the
> offsets that were stored in the checkpoint.
> The interval of drawing checkpoints therefore defines how much the
> program may have to go back at most, in case of a failure. To use fault
> tolerant Kafka Consumers, checkpointing of the topology needs to be 
> enabled
> in the job.
> If checkpointing is disabled, the Kafka consumer will periodically
> commit the offsets to Zookeeper.


 Thank you.

 Marco





Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Till Rohrmann
Great news, Yingjie. Thanks a lot for sharing this information with the
community and kudos to all the contributors of the external shuffle service
:-)

Cheers,
Till

On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao  wrote:

> Hi dev & users,
>
> We are happy to announce the open source of remote shuffle project [1] for
> Flink. The project is originated in Alibaba and the main motivation is to
> improve batch data processing for both performance & stability and further
> embrace cloud native. For more features about the project, please refer to
> [1].
>
> Before going open source, the project has been used widely in production
> and it behaves well on both stability and performance. We hope you enjoy
> it. Collaborations and feedbacks are highly appreciated.
>
> Best,
> Yingjie on behalf of all contributors
>
> [1] https://github.com/flink-extended/flink-remote-shuffle
>


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Till Rohrmann
Great news, Yingjie. Thanks a lot for sharing this information with the
community and kudos to all the contributors of the external shuffle service
:-)

Cheers,
Till

On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao  wrote:

> Hi dev & users,
>
> We are happy to announce the open source of remote shuffle project [1] for
> Flink. The project is originated in Alibaba and the main motivation is to
> improve batch data processing for both performance & stability and further
> embrace cloud native. For more features about the project, please refer to
> [1].
>
> Before going open source, the project has been used widely in production
> and it behaves well on both stability and performance. We hope you enjoy
> it. Collaborations and feedbacks are highly appreciated.
>
> Best,
> Yingjie on behalf of all contributors
>
> [1] https://github.com/flink-extended/flink-remote-shuffle
>


Re: Query regarding exceptions API(/jobs/:jobid/exceptions)

2021-11-30 Thread Matthias Pohl
Thanks for sharing this information. I verified that it's a bug in Flink.
The issue is that the Exceptions you're observing are happening while the
job is initialized. We're not setting the exception history properly in
that case.

Matthias

On Mon, Nov 29, 2021 at 2:08 PM Mahima Agarwal 
wrote:

> Hi Matthias,
>
> We have created a JIRA ticket for this issue. Please find the jira id below
>
> https://issues.apache.org/jira/browse/FLINK-25096
>
> Thanks
> Mahima
>
> On Mon, Nov 29, 2021 at 2:24 PM Matthias Pohl 
> wrote:
>
>> Thanks Mahima,
>> could you create a Jira ticket and, if possible, add the Flink logs? That
>> would make it easier to investigate the problem.
>>
>> Best,
>> Matthias
>>
>> On Sun, Nov 28, 2021 at 7:29 AM Mahima Agarwal 
>> wrote:
>>
>>> Thanks Matthias
>>>
>>> But we have observed the below 2 exceptions are coming in
>>> root-exceptions but not in exceptionHistory:
>>>
>>> caused by: java.util.concurrent.CompletionException:
>>> java.lang.RuntimeException: java.io.FileNotFoundException: Cannot find
>>> checkpoint or savepoint file/directory
>>> 'C:\Users\abc\Documents\checkpoints\a737088e21206281db87f6492bcba074' on
>>> file system 'file'.
>>>
>>> Caused by: java.lang.IllegalStateException: Failed to rollback to
>>> checkpoint/savepoint
>>> file:/mnt/c/Users/abc/Documents/checkpoints/a737088e21206281db87f6492bcba074/chk-144.
>>> Thanks and Regards
>>> Mahima Agarwal
>>>
>>>
>>> On Fri, Nov 26, 2021, 13:19 Matthias Pohl 
>>> wrote:
>>>
 Just to add a bit of context: The first-level members all-exceptions,
 root-exceptions, truncated and timestamp have been around for a longer
 time. The exceptionHistory was added in Flink 1.13. As part of this change,
 the aforementioned members were deprecated (see [1]). We kept them for
 backwards-compatibility reasons.

 That said, root-exception and all-exceptions are also represented in
 the exceptionHistory.

 Matthias

 [1]
 https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-exceptions

 On Thu, Nov 25, 2021 at 12:14 PM Chesnay Schepler 
 wrote:

> root-exception: The last exception that caused a job to fail.
> all-exceptions: All exceptions that occurred the last time a job
> failed. This is primarily useful for completed jobs.
> exception-history: Exceptions that previously caused a job to fail.
>
> On 25/11/2021 11:52, Mahima Agarwal wrote:
>
> Hi Team,
>
> Please find the query below regarding exceptions
> API(/jobs/:jobid/exceptions)
>
>
> In response of above rest api:
>
>
> Users are getting 3 types of exceptions:
> 1. exceptionHistory
> 2. all-exceptions
> 3. root-exception
>
>
> What is the purpose of the above 3 exceptions?
>
>
> Any leads would be appreciated.
>
> Thanks
> Mahima
>
>


Re: FLink Accessing two hdfs cluster

2021-11-30 Thread David Morávek
Hi chenqizhu,

this exception doesn't seem to come from Flink, but rather from a YARN
container bootstrap.

When YARN container starts up, it needs to download resources from HDFS
(your job archives / configuration / distributed cache / ...) which are
necessary for startup of the user application (in Flink case JobManager /
TaskManager). As far as I can tell, the affected NodeManager tries to pull
data from a filesystem it doesn't have access to (refer to hdfs-site.conf /
yarn logs on the particular node).

question : Why cannot flink-conf(flink.hadoop.*) overwrite the
> configurations read by YARN NodeManager ?
>

In this case the exception happens before any Flink code is executed by the
NodeManager.

I think NM logs can help you identify which files are not accessible by
YARN, that could narrow it down a bit.

Best,
D.

On Tue, Nov 30, 2021 at 9:23 AM chenqizhu  wrote:

> hi,
> Flink version 1.13 supports configuration of Hadoop properties in
> flink-conf.yaml via flink.hadoop.*. There is A requirement to write
> checkpoint to HDFS with SSDS (called Bcluster ) to speed checkpoint
> writing, but this HDFS cluster is not the default HDFS in the flink client
> (called Acluster ). Yaml is configured with nameservices for cluster A and
> cluster B, which is similar to HDFS federated mode.
>
> The configuration is as follows:
>
> flink.hadoop.dfs.nameservices: ACluster,BCluster
> flink.hadoop.fs.defaultFS: hdfs://BCluster
> flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.:9000
> flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.:50070
> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xx:9000
> flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xx:50070
> flink.hadoop.dfs.client.failover.proxy.provider.ACluster: 
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>
> flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xx:9000
> flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xx:50070
> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xx:9000
> flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.x:50070
> flink.hadoop.dfs.client.failover.proxy.provider.BCluster: 
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>
> However, an error occurred during the startup of the job, which is
> reported as follows:
>
> (change configuration items to A flink local client default HDFS cluster,
> the operation can be normal boot:  flink.hadoop.fs.DefaultFS: hdfs: / /
> ACluster)
>
>
> Failing this attempt.Diagnostics: [2021-11-30 
> 15:39:15.582]java.net.UnknownHostException: BCluster
>
> java.lang.IllegalArgumentException: java.net.UnknownHostException: BCluster
>
>   at 
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
>   at 
> org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
>   at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)
>   at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
>   at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
>   at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
>   at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
>   at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
>   at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
>   at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
>   at 
> org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
>   at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
>   at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
>   at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
>   at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)
>   at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)
>   at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)
>   at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at 

Re: how to run streaming process after batch process is completed?

2021-11-30 Thread Alexander Preuß
Hi Vtygoss,

Can you explain a bit more about your ideal pipeline? Is the batch data
bounded data or could you also process it in streaming execution mode? And
is the streaming data derived from the batch data or do you just want to
ensure that the batch has been finished before running the processing of
the streaming data?

Best Regards,
Alexander

(sending again because I accidentally left out the user ml in the reply on
the first try)

On Tue, Nov 30, 2021 at 12:38 PM vtygoss  wrote:

> Hi, community!
>
>
> By Flink, I want to unify batch process and streaming process in data
> production pipeline. Batch process is used to process inventory data, then
> streaming process is used to process incremental data. But I meet a
> problem, there is no  state in batch and the result is error if i run
> stream process directly.
>
>
> So how to run streaming process accurately  after batch process is
> completed?   Is there any doc or demo to handle this scenario?
>
>
> Thanks for your any reply or suggestion!
>
>
> Best Regards!
>
>
>
>
>


[ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Yingjie Cao
Hi dev & users,

We are happy to announce the open source of remote shuffle project [1] for
Flink. The project is originated in Alibaba and the main motivation is to
improve batch data processing for both performance & stability and further
embrace cloud native. For more features about the project, please refer to
[1].

Before going open source, the project has been used widely in production
and it behaves well on both stability and performance. We hope you enjoy
it. Collaborations and feedbacks are highly appreciated.

Best,
Yingjie on behalf of all contributors

[1] https://github.com/flink-extended/flink-remote-shuffle


[ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Yingjie Cao
Hi dev & users,

We are happy to announce the open source of remote shuffle project [1] for
Flink. The project is originated in Alibaba and the main motivation is to
improve batch data processing for both performance & stability and further
embrace cloud native. For more features about the project, please refer to
[1].

Before going open source, the project has been used widely in production
and it behaves well on both stability and performance. We hope you enjoy
it. Collaborations and feedbacks are highly appreciated.

Best,
Yingjie on behalf of all contributors

[1] https://github.com/flink-extended/flink-remote-shuffle


Re: Flink1.13.3 on Native K8s 任务取消问题

2021-11-30 Thread Yang Wang
如果你使用的是native模式,设计上cancel job以后,所有K8s相关的资源会被释放,HA相关的信息也会自动被删除[1]

出现重新拉起是不符合预期的,你可以把JM日志发出来具体看一下

[1].
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up

Best,
Yang

研发-江志洋  于2021年11月30日周二 上午9:39写道:

> 你好,最近在使用Flink1.13.3 Application模式 on Native K8s部署任务,有几个问题不太理解:
> 1.在UI页面点击cancel取消任务,马上又会重新拉起来,达不到取消任务的目的?
> 2.通过kubectl delete deployment/cluster_id 命令删除任务后,通过bin/flink
> run-application方式起任务,会自动的从上次最后一个成功Checkpoint进行恢复,有什么方式可以不从ck恢复吗?
>
>
> 期复!


Re: How to Fan Out to 100s of Sinks

2021-11-30 Thread Fabian Paul
Hi Shree,

I think for every Iceberg Table you have to instantiate a different
sink in your program. You basically have one operator before your
sinks that decides where to route the records. You probably end up
with one Iceberg sink for each of your customers. Maybe you can take a
look at the DemultiplixingSink [1] but unfortunately, there has not
been much progress yet.

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-24493

On Mon, Nov 29, 2021 at 7:11 PM SHREEKANT ANKALA  wrote:
>
> Hi,
> Here is our scenario:
>
> We have a system that generates data in a jsonl file for all of customers 
> together. We now need to process this jsonl data and conditionally distribute 
> the data to individual customer based on their preferences as Iceberg Tables. 
> So every line in the jsonl file, the data will end up one of the customers S3 
> bucket as an Iceberg table row. We were hoping to continue using Flink for 
> this use case by just one job doing a conditional sink, but we are not sure 
> if that would be the right usage of Flink.
>
> Thanks,
> Shree
> 
> From: Fabian Paul 
> Sent: Monday, November 29, 2021 1:57 AM
> To: SHREEKANT ANKALA 
> Cc: user@flink.apache.org 
> Subject: Re: How to Fan Out to 100s of Sinks
>
> Hi,
>
> What do you mean by "fan out" to 100 different sinks? Do you want to
> replicate the data in all buckets or is there some conditional
> branching logic?
>
> In general, Flink can easily support 100 different sinks but I am not
> sure if this is the right approach for your use case. Can you clarify
> your motivation and tell us a bit more about the exact scenario?
>
> Best,
> Fabian
>
>
>
> On Mon, Nov 29, 2021 at 1:11 AM SHREEKANT ANKALA  wrote:
> >
> > Hi all, we current have a Flink job that retrieves jsonl data from GCS and 
> > writes to Iceberg Tables. We are using Flink 13.2 and things are working 
> > fine.
> >
> > We now have to fan out that same data in to 100 different sinks - Iceberg 
> > Tables on s3. There will be 100 buckets and the data needs to be sent to 
> > each of these 100 different buckets.
> >
> > We are planning to add a new Job that will write to 1 sink at a time for 
> > each time it is launched. Is there any other optimal approach possible in 
> > Flink to support this use case of 100 different sinks?


how to run streaming process after batch process is completed?

2021-11-30 Thread vtygoss
Hi, community!


By Flink, I want to unify batch process and streaming process in data 
production pipeline. Batch process is used to process inventory data, then 
streaming process is used to process incremental data. But I meet a problem, 
there is no  state in batch and the result is error if i run stream process 
directly. 


So how to run streaming process accurately  after batch process is completed?   
Is there any doc or demo to handle this scenario?


Thanks for your any reply or suggestion!


Best Regards!

Re: Parquet schema per bucket in Streaming File Sink

2021-11-30 Thread Francesco Guardiani
Hi Zack,

> I want to customize this job to "explode" the map as column names and
values

You can do this in a select statement extracting manually the map values
using the map access built-in
,
e.g.:

SELECT mymap['a'] AS a, mymap['b'] AS b

> specifically the BucketAssigner and the CheckpointRollingPolicy both
appear to be required to have a bucketId of a String.

I wonder if what you're looking for is the PARTITIONED BY feature:

CREATE TABLE MySinkTable (
  ...) PARTITIONED BY (partitionKey1, partitionKey2)

Does this solves your use case?

FG


On Tue, Nov 30, 2021 at 7:13 AM Zack Loebel  wrote:

> Hey all,
>
> I have a job which writes data that is a similar shape to a location in
> s3. Currently it writes a map of data with each row. I want to customize
> this job to "explode" the map as column names and values, these are
> consistent for a single bucket. Is there any way to do this? Provide a
> custom parquet schema per bucket within a single dynamic sink?
>
> I've started looking at the changes within the main codebase to make this
> feasible. It seems straightforward to provide the bucketId to the
> writerFactory, and the bucketId could be a type containing the relevant
> schema information.
> Although it appears that the BulkFormatBuilder has several spots where
> BucketId appears to be required to be a String: specifically
> the BucketAssigner and the CheckpointRollingPolicy both appear to be
> required to have a bucketId of a String.
>
> I'm curious if this is a change the community would be open to, and or if
> there is another way to accomplish what I'm looking for that I've missed.
>
> Thanks,
> Zack
>
>


FLink Accessing two hdfs cluster

2021-11-30 Thread chenqizhu
hi,
Flink version 1.13 supports configuration of Hadoop properties in 
flink-conf.yaml via flink.hadoop.*. There is A requirement to write checkpoint 
to HDFS with SSDS (called Bcluster ) to speed checkpoint writing, but this HDFS 
cluster is not the default HDFS in the flink client  (called Acluster ). Yaml 
is configured with nameservices for cluster A and cluster B, which is similar 
to HDFS federated mode.

The configuration is as follows:

flink.hadoop.dfs.nameservices: ACluster,BCluster
flink.hadoop.fs.defaultFS: hdfs://BCluster

flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.:9000
flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.:50070
flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xx:9000
flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xx:50070
flink.hadoop.dfs.client.failover.proxy.provider.ACluster: 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xx:9000
flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xx:50070
flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xx:9000
flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.x:50070
flink.hadoop.dfs.client.failover.proxy.provider.BCluster: 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

However, an error occurred during the startup of the job, which is reported as 
follows:

(change configuration items to A flink local client default HDFS cluster, the 
operation can be normal boot:  flink.hadoop.fs.DefaultFS: hdfs: / / ACluster)


Failing this attempt.Diagnostics: [2021-11-30 
15:39:15.582]java.net.UnknownHostException: BCluster

java.lang.IllegalArgumentException: java.net.UnknownHostException: BCluster

at 
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
at 
org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at 
org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.UnknownHostException: BCluster

... 28 more
Caused by: BCluster
java.net.UnknownHostException: BCluster
at 
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
at 
org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
at