Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-27 Thread 刘建刚
  Thank you  very much. It solved my problem.

> 2020年4月22日 下午5:15,Jingsong Li [via Apache Flink User Mailing List archive.] 
>  写道:
> 
> Hi,
> 
> Sorry for the mistake, [1] is related, but this bug has been fixed totally in 
> [2], so the safe version should be 1.9.3+ and 1.10.1+, so there is no safe 
> released version now.
> 
> 1.10.1 will been released very soon.
> 
> [1]https://issues.apache.org/jira/browse/FLINK-13702 
> 
> [2]https://issues.apache.org/jira/browse/FLINK-16242 
> 
> 
> Best,
> Jingsong Lee
> 
> On Wed, Apr 22, 2020 at 4:50 PM Jingsong Li <[hidden email] 
> > wrote:
> Hi,
> 
> Just like Jark said, it may be FLINK-13702[1]. Has been fixed in 1.9.2 and 
> later versions.
> 
> > Can it be a thread-safe problem or something else?
> 
> Yes, it is a thread-safe problem with lazy materialization.
> 
> [1]https://issues.apache.org/jira/browse/FLINK-13702 
> 
> 
> Best,
> Jingsong Lee
> 
> On Tue, Apr 21, 2020 at 1:21 PM forideal <[hidden email] 
> > wrote:
> Hi Kurt:
> I had the same mistake.
> 
>sql:
>insert into
>   dw_access_log
> select
>   get_json_value(query_nor, query_nor_counter) as `value`
> from
>   ods_access_log_source
> group by
>   tumble (time_key, interval '1' MINUTE),
>   group_key
> 
> get_json_value
> public class GetJsonValue extends AggregateFunction Long>> {
> @Override
> public boolean isDeterministic() {
> return false;
> }
> 
> @Override
> public Map createAccumulator() {
> return new HashMap<>();
> }
> 
> @Override
> public void open(FunctionContext context) throws Exception {
> 
> }
> 
> public void accumulate(Map datas, String key, long value) {
> datas.put(key, value);
> }
> 
> @Override
> public String getValue(Map acc) {
> return JSON.toJSONString(acc);
> }
> 
> 
> @Override
> public TypeInformation getResultType() {
> return Types.STRING;
> }
> 
> }
> 
> Best forideal
> 
> 
> 
> At 2020-04-21 10:05:05, "Kurt Young" <[hidden email] 
> > wrote:
> 
> Thanks, once you can reproduce this issue locally, please open a jira with 
> your testing program.
> 
> Best,
> Kurt
> 
> 
> On Tue, Apr 21, 2020 at 8:48 AM 刘建刚 <[hidden email] 
> > wrote:
> Thank you. It is an online job and my input is huge. I check the trace and 
> find that the array is resized when the array is not enough. The code is as 
> below:
> 
> public void add (int value) {
>int[] items = this.items;
>if (size == items.length) items = resize(Math.max(8, (int)(size * 1.75f)));
>items[size++] = value;
> }
> 
> Only blink planner has this error. Can it be a thread-safe problem or 
> something else? I will try to reproduce it locally.
> 
>> 2020年4月21日 上午12:20,Jark Wu-3 [via Apache Flink User Mailing List archive.] 
>> <[hidden email] > 写道:
>> 
>> Hi,
>> 
>> Are you using versions < 1.9.2? From the exception stack, it looks like 
>> caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0.
>> Could you try it using 1.9.2?
>> 
>> Best,
>> Jark
>> 
>> On Mon, 20 Apr 2020 at 21:00, Kurt Young <[hidden email] <>> wrote:
>> Can you reproduce this in a local program with mini-cluster?
>> 
>> Best,
>> Kurt
>> 
>> 
>> On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <[hidden email] <>> wrote:
>> You can read this for this type error.
>> 
>> https://stackoverflow.com/questions/28189446/i-always-get-this-error-exception-in-thread-main-java-lang-arrayindexoutofbou#comment44747327_28189446
>>  
>> 
>> 
>> I would suggest you set break points in your code. Step through the code, 
>> this method should show you which array variable is being passed a null 
>> argument when the array variable is not null able.
>> 
>> 
>> 
>> 
>> On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email] <>> wrote:
>>   I am using Roaring64NavigableMap to compute uv. It is ok to us flink 
>> planner and not ok with blink planner. The SQL is as following:
>> SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, 
>> A, B, C, D,
>> E, uv(bitmap(id)) as bmp
>> FROM person
>> GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E
>> 
>>   The udf is as following:
>> public static class Bitmap extends AggregateFunction> Roaring64NavigableMap> {
>>@Override
>>public Roaring64NavigableMap createAccumulator() {
>>   return new Roaring64NavigableMap();
>>}
>> 
>>@Override
>>public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
>>   return accumulator;
>>}
>> 
>>public void accumulate(Roaring64NavigableMap bitmap, long id) {
>>   bitmap.add(id);
>>}
>> }
>> public static class UV extends ScalarFunction {
>>public long e

Blink window and cube

2020-04-27 Thread 刘建刚
  Hi, I find that blink planner supports CUBE. CUBE can be used together 
with a field but not window. For example, the following SQL is not supported:
SELECT A, B, sum(C)
FROM person
GROUP BY cube(A, B), TUMBLE(curTimestamp, interval '1' minute)
  The following error is reported. Is there a way to combine cube and 
window? Thank you very much.

Exception in thread "main" 
org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
TUMBLE(TIMESTAMP(3) *ROWTIME*, INTERVAL SECOND(3) NOT NULL) 
If you think this function should be supported, you can create an issue and 
start a discussion for it.
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlanInternal(StreamExecExpand.scala:82)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlanInternal(StreamExecExpand.scala:42)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlan(StreamExecExpand.scala:42)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:139)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:55)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAg

join state TTL

2020-04-27 Thread lec ssmi
Hi:
  When the stream is joined with another stream , the cached stream data
will be saved as a state and deleted as the watermark advances.
  I found that there is also a parameter that can set the state expiration
time, such as StateTtlConfig in DataStream API, TableConfig in TableAPI &SQL
 .This setting is effective for the state of group by operator. And now the
state TTL seems to be based on processing time.If the configured TTL  has
been reached and the watermark has not moved to the edge. The state of join
will be cleared ? What is the relationship between StateTtlConfig  and
TableConfig? If I use  StateTtlConfig  and program with TableAPI, can the
configuration take effect?

  Best regards
  Lec Ssmi


ML/DL via Flink

2020-04-27 Thread m@xi
Hello Flinkers,

I am building a *streaming* prototype system on top of Flink and I want
ideally to enable ML training (if possible DL) in Flink. It would be nice to
lay down all the existing libraries that provide primitives that enable the
training of ML models. 

I assume it is more efficient to do all the training in Flink (somehow)
rather than (re)training a model in Tensorflow (or Pytorch) and porting it
to a flink Job. For instance,
https://stackoverflow.com/questions/59563265/embedd-existing-ml-model-in-apache-flink
Especially, in streaming ML systems the training and the serving should both
happen in an online fashion.

To initialize the pool, I have found the following options that run on top
of Flink i.e., leveraging the engine for distributed and scalable ML
training.

1) *FlinkML(DataSet API)*
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/ml/index.html
This is not for streaming ML as it shits on top of DataSet API. In addition,
recently the library is dropped
https://stackoverflow.com/questions/58752787/what-is-the-status-of-flinkml
but there is ongoing development (??) of a new library on top of TableAPI.
https://cwiki.apache.org/confluence/display/FLINK/FLIP-39+Flink+ML+pipeline+and+ML+libs
https://issues.apache.org/jira/browse/FLINK-12470
which is not in the 1.10 distribution.

2) *Apache Mahout* https://mahout.apache.org/
I thought it was long dead, but recently they started developing it again. 

3) *Apache SAMOA* https://samoa.incubator.apache.org/
They are developing it, but slowly. It is an incubator project since 2013.

4) *FlinkML Organization* https://github.com/FlinkML
This one has repos that are interesting e.g. the flink-jpmml
https://github.com/FlinkML/flink-jpmml 
and an implementation of a parameter server
https://github.com/FlinkML/flink-parameter-server
, which is really usefull when for enabling distributed training in a sense
that the model is also distributed during training.
Though, the repo(s) are not really active.

5) *DeepLearning4j *https://deeplearning4j.org/
This is a distributed, deep learning library that it was said to work also
on top of Flink (here
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-support-for-DeepLearning4j-or-other-deep-learning-library-td12157.html)
I am not interested at all in GPU support but I am wondering is anyone had
succesfully used this one on top of Flink.

6) *Proteus - SOLMA* https://github.com/proteus-h2020/proteus-solma
It is a scalable online learning library on top of Flink, and is the output
of a H2020 research project called PROTEUS. 
http://www.bdva.eu/sites/default/files/hbouchachia_sacbd-ecsa18.pdf

7) *Alibaba - ALink*
https://github.com/alibaba/Alink/blob/master/README.en-US.md
A machine learning algorithm platform from Alibaba which is actively
maintained.

These are all the available systems that I have found ML using Flink's
engine. 

*Questions*
(i)  Has anyone used them?
(ii) More specifically, has someone implemented *Stochastic Gradient
Descent, Skip-gram models, Autoencoders* with any of the above tools (or
other)?

*Remarks*
If you have any experiences/comments/additions to share please do it!  Gotta
Catch 'Em All!   

Best,
Max




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Re: Re: How to disable the state behind `COUNT` sql?

2020-04-27 Thread Jark Wu
Yes.
This is an optimization if the previous result is the same to the new
result, then operator will not generate records for the new result.

Best,
Jark

On Tue, 28 Apr 2020 at 11:05, izual  wrote:

> Thank you, Jark.
>
> I also have tried COUNT DISTINCT ^_^, the only problem is that if the
> `tblEvent` generates two simple id, such as:
> t1: {"id": 1}
> t2: {"id": 1}
>
> But the sql will only output one result only on t1 record received.
> I think maybe some optimizer worker background when the result does not
> change?
>
>
> At 2020-04-28 10:53:34, "Jark Wu"  wrote:
>
> Hi izual,
>
> In such case, I think you should try COUNT DISTINCT instead of COUNT.
> DISTINCT will help to deduplicate, so no matter how many times you
> received id=1, the region count should always 3.
>
> SELECT tblEvent.id, COUNT(DISTINCT tblDim.region) FROM tblEvent JOIN
> tblDim FOR SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP
> BY tblEvent.id
>
> Best,
> Jark
>
>
> On Mon, 27 Apr 2020 at 23:41, Benchao Li  wrote:
>
>> Hi izual,
>>
>> IMO, implementing your own COUNT/SUM UDAF doesn't solve the problem.
>> The state is not managed in UDAF, it's managed by aggregation operator,
>> and
>> your UDAF's aggregator will be handled by operator using state.
>>
>> izual  于2020年4月27日周一 下午11:21写道:
>>
>>> Thanks, Benchao.
>>>
>>> Maybe change the dimension table will work, but this changes a lot,
>>> include `size/count` is not the column of one dim table.
>>> I notice that user can define Aggregate Functions[1],  but this page
>>> also said:
>>> > Accumulators are automatically backup-ed by Flink’s checkpointing
>>> mechanism and restored
>>> So is it right to implement my own COUNT/SUM UDF?
>>>
>>> [1].
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregation-functions
>>>
>>>
>>>
>>>
>>>
>>>
>>> At 2020-04-27 17:32:14, "Benchao Li"  wrote:
>>>
>>> Hi,
>>>
>>> There is indeed a state for the aggregation result, however we cannot
>>> disable it, it's by design.
>>> StreamQueryConfig.maxIdleStateRetentionTime can control how long the
>>> state will be kept.
>>> If you can ensure the time gap between two records of the same id larger
>>> than, for example
>>> 1 min, then setting retention time to 1min can resolve your issue.
>>> If not, maybe you need to change your dimension table, making it return
>>> the count directly instead
>>> of return the details.
>>>
>>> izual  于2020年4月27日周一 下午5:06写道:
>>>
 I implements my DimTable by extends `LookupTableSource`[1], which
 stores data like:

 id=1 -> (SH, BJ, SD)

 id=2 -> (...)

 and then extends `TableFunction` to return the value corresponding to
 the lookup keys,and maybe return multi rows, for example, when lookupkeys
 is id=1, then in the `TableFunction.eval`

 ```

 collect('SH')

 collect('BJ')

 collect('SD')

 ```


 Now I want to get the region'count by id, which is from the
 tblEvent.id, sql is :


 SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR
 SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY
 tblEvent.id


 I expect the result of COUNT is always 3 for id = 1, no matter the id=1
 appears how many times.

 but the actual result is : 3, 6, 9, ...


 I think this is bcz the state mechanism behind COUNT, how to turn this
 off?

 Or what's the correct use for this?
 StreamQueryConfig.maxIdleStateRetentionTime or something?


 The reason not using state in flink:
 http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E

 [1]:
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups




>>>
>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>
>
>
>


Re: RocksDB default logging configuration

2020-04-27 Thread Yun Tang
Hi Bajaj

Current "state.checkpoints.dir" defines cluster-wide location for cluster and 
each job would create the specific checkpoint location under it with job-id 
sub-directory. It is the same for the checkpoint URL in RocksDB.

And the configuration option "state.backend.rocksdb.localdir" [1] should work 
for RocksDB in Flink-1.7.1.

[1] 
https://github.com/apache/flink/blob/808cc1a23abb25bd03d24d75537a1e7c6987eef7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L285-L301

Best
Yun Tang

From: Bajaj, Abhinav 
Sent: Tuesday, April 28, 2020 8:03
To: user@flink.apache.org 
Cc: Chesnay Schepler 
Subject: Re: RocksDB default logging configuration


It seems requiring the checkpoint URL to create the RocksDBStateBackend mixes 
up the operational aspects of cluster within the job.

RocksDBStateBackend stateBackend = new RocksDBStateBackend(“CHECKPOINT_URL”, 
true);
stateBackend.setDbStoragePath(“DB_STORAGE_PATH”);



Also, noticed that the RocksDBStateBackend picks up the savepoint dir from 
property “state.savepoints.dir” of the flink-conf.yaml file but does not pick 
up the “state.backend.rocksdb.localdir”.

So I had to set from the job as above.



I feel there is a disconnect and would like to get confirmation of the above 
behavior, if possible.

I am using Flink 1.7.1.



Thanks Chesnay for your response below.



~ Abhinav Bajaj



From: Chesnay Schepler 
Date: Wednesday, April 22, 2020 at 11:17 PM
To: "Bajaj, Abhinav" , "user@flink.apache.org" 

Subject: Re: RocksDB default logging configuration



CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.



AFAIK this is not possible; the client doesn't know anything about the cluster 
configuration.



FLINK-15747 proposes to add an additional config option for controlling the 
logging behavior.



The only workaround I can think of would be to create a custom Flink 
distribution with a modified RocksDBStateBackend which always sets these 
options by default.





On 23/04/2020 03:24, Bajaj, Abhinav wrote:

Bumping this one again to catch some attention.



From: "Bajaj, Abhinav" 
Date: Monday, April 20, 2020 at 3:23 PM
To: "user@flink.apache.org" 

Subject: RocksDB default logging configuration



Hi,



Some of our teams ran into the disk space issues because of RocksDB default 
logging configuration - 
FLINK-15068.

It seems the workaround suggested uses the OptionsFactory to set some of the 
parameters from inside the job.



Since we provision the Flink cluster(version 1.7.1) for the teams, we control 
the RocksDB statebackend configuration from flink-conf.yaml.

And it seems there isn’t any related RocksDB 
configuration
 to set in flink-conf.yaml.



Is there a way for the job developer to retrieve the default statebackend 
information from the cluster in the job and set the DBOptions on top of it?



Appreciate the help!



~ Abhinav Bajaj



PS:  Sharing below snippet as desired option if possible -



StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();

StateBackend stateBackend = streamExecEnv.getDefaultStateBackend();

stateBackend.setOptions(new OptionsFactory() {

@Override
public DBOptions createDBOptions(DBOptions dbOptions) {
  dbOptions.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
  dbOptions.setMaxLogFileSize(1024 * 1024)
  return dbOptions;
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
columnFamilyOptions) {
  return columnFamilyOptions;
}

});








Re:Re: Re: How to disable the state behind `COUNT` sql?

2020-04-27 Thread izual
Thank you, Jark.


I also have tried COUNT DISTINCT ^_^, the only problem is that if the 
`tblEvent` generates two simple id, such as:
t1: {"id": 1}
t2: {"id": 1}


But the sql will only output one result only on t1 record received.
I think maybe some optimizer worker background when the result does not change?




At 2020-04-28 10:53:34, "Jark Wu"  wrote:

Hi izual,


In such case, I think you should try COUNT DISTINCT instead of COUNT. 
DISTINCT will help to deduplicate, so no matter how many times you received 
id=1, the region count should always 3. 


SELECT tblEvent.id, COUNT(DISTINCT tblDim.region) FROM tblEvent JOIN tblDim FOR 
SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id


Best,
Jark




On Mon, 27 Apr 2020 at 23:41, Benchao Li  wrote:

Hi izual,


IMO, implementing your own COUNT/SUM UDAF doesn't solve the problem.
The state is not managed in UDAF, it's managed by aggregation operator, and 
your UDAF's aggregator will be handled by operator using state.


izual  于2020年4月27日周一 下午11:21写道:

Thanks, Benchao.


Maybe change the dimension table will work, but this changes a lot, include 
`size/count` is not the column of one dim table.
I notice that user can define Aggregate Functions[1],  but this page also said:
> Accumulators are automatically backup-ed by Flink’s checkpointing mechanism 
> and restored
So is it right to implement my own COUNT/SUM UDF?


[1]. 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregation-functions


















At 2020-04-27 17:32:14, "Benchao Li"  wrote:

Hi,


There is indeed a state for the aggregation result, however we cannot disable 
it, it's by design.
StreamQueryConfig.maxIdleStateRetentionTime can control how long the state will 
be kept.
If you can ensure the time gap between two records of the same id larger than, 
for example 
1 min, then setting retention time to 1min can resolve your issue.
If not, maybe you need to change your dimension table, making it return the 
count directly instead 
of return the details.


izual  于2020年4月27日周一 下午5:06写道:


I implements my DimTable by extends `LookupTableSource`[1], which stores data 
like:

id=1 -> (SH, BJ, SD)

id=2 -> (...)

and then extends `TableFunction` to return the value corresponding to the 
lookup keys,and maybe return multi rows, for example, when lookupkeys is id=1, 
then in the `TableFunction.eval`

```

collect('SH')

collect('BJ')

collect('SD')

```




Now I want to get the region'count by id, which is from the tblEvent.id, sql is 
:




SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM 
AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id




I expect the result of COUNT is always 3 for id = 1, no matter the id=1 appears 
how many times.

but the actual result is : 3, 6, 9, ...




I think this is bcz the state mechanism behind COUNT, how to turn this off?

Or what's the correct use for this? StreamQueryConfig.maxIdleStateRetentionTime 
or something?




The reason not using state in flink: 
http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E

[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups





 





--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn




 





--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: Re: How to disable the state behind `COUNT` sql?

2020-04-27 Thread Jark Wu
Hi izual,

In such case, I think you should try COUNT DISTINCT instead of COUNT.
DISTINCT will help to deduplicate, so no matter how many times you received
id=1, the region count should always 3.

SELECT tblEvent.id, COUNT(DISTINCT tblDim.region) FROM tblEvent JOIN tblDim
FOR SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY
tblEvent.id

Best,
Jark


On Mon, 27 Apr 2020 at 23:41, Benchao Li  wrote:

> Hi izual,
>
> IMO, implementing your own COUNT/SUM UDAF doesn't solve the problem.
> The state is not managed in UDAF, it's managed by aggregation operator,
> and
> your UDAF's aggregator will be handled by operator using state.
>
> izual  于2020年4月27日周一 下午11:21写道:
>
>> Thanks, Benchao.
>>
>> Maybe change the dimension table will work, but this changes a lot,
>> include `size/count` is not the column of one dim table.
>> I notice that user can define Aggregate Functions[1],  but this page also
>> said:
>> > Accumulators are automatically backup-ed by Flink’s checkpointing
>> mechanism and restored
>> So is it right to implement my own COUNT/SUM UDF?
>>
>> [1].
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregation-functions
>>
>>
>>
>>
>>
>>
>> At 2020-04-27 17:32:14, "Benchao Li"  wrote:
>>
>> Hi,
>>
>> There is indeed a state for the aggregation result, however we cannot
>> disable it, it's by design.
>> StreamQueryConfig.maxIdleStateRetentionTime can control how long the
>> state will be kept.
>> If you can ensure the time gap between two records of the same id larger
>> than, for example
>> 1 min, then setting retention time to 1min can resolve your issue.
>> If not, maybe you need to change your dimension table, making it return
>> the count directly instead
>> of return the details.
>>
>> izual  于2020年4月27日周一 下午5:06写道:
>>
>>> I implements my DimTable by extends `LookupTableSource`[1], which stores
>>> data like:
>>>
>>> id=1 -> (SH, BJ, SD)
>>>
>>> id=2 -> (...)
>>>
>>> and then extends `TableFunction` to return the value corresponding to
>>> the lookup keys,and maybe return multi rows, for example, when lookupkeys
>>> is id=1, then in the `TableFunction.eval`
>>>
>>> ```
>>>
>>> collect('SH')
>>>
>>> collect('BJ')
>>>
>>> collect('SD')
>>>
>>> ```
>>>
>>>
>>> Now I want to get the region'count by id, which is from the tblEvent.id,
>>> sql is :
>>>
>>>
>>> SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR
>>> SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY
>>> tblEvent.id
>>>
>>>
>>> I expect the result of COUNT is always 3 for id = 1, no matter the id=1
>>> appears how many times.
>>>
>>> but the actual result is : 3, 6, 9, ...
>>>
>>>
>>> I think this is bcz the state mechanism behind COUNT, how to turn this
>>> off?
>>>
>>> Or what's the correct use for this?
>>> StreamQueryConfig.maxIdleStateRetentionTime or something?
>>>
>>>
>>> The reason not using state in flink:
>>> http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups
>>>
>>>
>>>
>>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>
>>
>>
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: Flink Lookup Filter Pushdown

2020-04-27 Thread Jark Wu
Hi forideal,

Currently, dimension table join requires at least one join key. That is the
`u_id` in your example. The join key will be used as lookup key.
If you have some additional filters on dimension table, that's fine, Flink
will help to filter the fetched data.
That means Flink supports following dimension join query:

select score_a ... left join ... source_table.u_id=dim_u_score.u_id
where dim_u_score.score_b > 1;


At present, dimension table join doesn't pushdown filters into source, so
if the data associated to the given lookup key is very large, it will have
a high IO cost.
However, filter pushdown into lookup table is in the roadmap.

Best,
Jark







On Mon, 27 Apr 2020 at 20:08, forideal  wrote:

> Hello, my friend.
>
> I have a dimension table.
>
> create table dim_u_score( u_id bigint, varchar, score_a double, score_b 
> double) with {xxx}In a scene
>
> The condition of lookup is fliter score_a > 0.9
>
> In another scenario
>
> The condition of lookup is fliter score_b > 1
>
> In Flink, at present, lookup join can use on to pass key values, such as
>
> select score_a ... left join ... source_table.u_id=dim_u_score.u_id
>
> If so, what should I do?
>
> If not, can I say that I can create multiple tables with conditions to use
> when it comes?
>
> such as
>
> create table dim_u_score_filter_a( u_id bigint, varchar, score_a double, 
> score_b double) with{"filter_condition"="score_a > 0.9 "}create table 
> dim_u_score_filter_b( u_id bigint, varchar, score_a double, score_b 
> double)with {"filter_condition"="fliter score_b > 1 "}
>
> Then, in the process of lookup, push down to the specific execution engine
> to complete the lookup filter.
>
>
>
>


Re: Flink 1.9.2 why always checkpoint expired

2020-04-27 Thread qq
Hi Jiayi Liao.

  Thanks your replying.   Add attachment . And can’t get any useful messages;

 


> 2020年4月27日 12:40,Jiayi Liao  写道:
> 
> <粘贴的图形-1.tiff>



Re: RocksDB default logging configuration

2020-04-27 Thread Bajaj, Abhinav
It seems requiring the checkpoint URL to create the RocksDBStateBackend mixes 
up the operational aspects of cluster within the job.
RocksDBStateBackend stateBackend = new RocksDBStateBackend(“CHECKPOINT_URL”, 
true);
stateBackend.setDbStoragePath(“DB_STORAGE_PATH”);

Also, noticed that the RocksDBStateBackend picks up the savepoint dir from 
property “state.savepoints.dir” of the flink-conf.yaml file but does not pick 
up the “state.backend.rocksdb.localdir”.
So I had to set from the job as above.

I feel there is a disconnect and would like to get confirmation of the above 
behavior, if possible.
I am using Flink 1.7.1.

Thanks Chesnay for your response below.

~ Abhinav Bajaj

From: Chesnay Schepler 
Date: Wednesday, April 22, 2020 at 11:17 PM
To: "Bajaj, Abhinav" , "user@flink.apache.org" 

Subject: Re: RocksDB default logging configuration

CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.

AFAIK this is not possible; the client doesn't know anything about the cluster 
configuration.

FLINK-15747 proposes to add an additional config option for controlling the 
logging behavior.

The only workaround I can think of would be to create a custom Flink 
distribution with a modified RocksDBStateBackend which always sets these 
options by default.


On 23/04/2020 03:24, Bajaj, Abhinav wrote:
Bumping this one again to catch some attention.

From: "Bajaj, Abhinav" 
Date: Monday, April 20, 2020 at 3:23 PM
To: "user@flink.apache.org" 

Subject: RocksDB default logging configuration

Hi,

Some of our teams ran into the disk space issues because of RocksDB default 
logging configuration - 
FLINK-15068.
It seems the workaround suggested uses the OptionsFactory to set some of the 
parameters from inside the job.

Since we provision the Flink cluster(version 1.7.1) for the teams, we control 
the RocksDB statebackend configuration from flink-conf.yaml.
And it seems there isn’t any related RocksDB 
configuration
 to set in flink-conf.yaml.

Is there a way for the job developer to retrieve the default statebackend 
information from the cluster in the job and set the DBOptions on top of it?

Appreciate the help!

~ Abhinav Bajaj

PS:  Sharing below snippet as desired option if possible -

StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = streamExecEnv.getDefaultStateBackend();
stateBackend.setOptions(new OptionsFactory() {
@Override
public DBOptions createDBOptions(DBOptions dbOptions) {
  dbOptions.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
  dbOptions.setMaxLogFileSize(1024 * 1024)
  return dbOptions;
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
columnFamilyOptions) {
  return columnFamilyOptions;
}
});






Re: "Fill in" notification messages based on event time watermark

2020-04-27 Thread David Anderson
Following up on Piotr's outline, there's an example in the documentation of
how to use a KeyedProcessFunction to implement an event-time tumbling
window [1]. Perhaps that can help you get started.

Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example


On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski  wrote:

> Hi,
>
> I’m not sure, but I don’t think there is an existing window that would do
> exactly what you want. I would suggest to go back to the
> `keyedProcessFunction` (or a custom operator?), and have a
> MapState currentStates field. Your key would
> be for example a timestamp of the beginning of your window. Value would be
> the latest state in this time window, annotated with a timestamp when this
> state was record.
>
> On each element:
>
> 1. you determine the window’s begin ts (key of the map)
> 2. If it’s first element, register an event time timer to publish results
> for that window’s end TS
> 3. look into the `currentStates` if it should be modified (if your new
> element is newer or first value for the given key)
>
> On even time timer firing
> 1. output the state matching to this timer
> 2. Check if there is a (more recent) value for next window, and if not:
>
> 3. copy the value to next window
> 4. Register a timer for this window to fire
>
> 5. Cleanup currentState and remove value for the no longed needed key.
>
> I hope this helps
>
> Piotrek
>
> On 27 Apr 2020, at 12:01, Manas Kale  wrote:
>
> Hi,
> I have an upstream operator that outputs device state transition messages
> with event timestamps. Meaning it only emits output when a transition takes
> place.
> For example,
> state1 @ 1 PM
> state2 @ 2 PM
> and so on.
>
> *Using a downstream operator, I want to emit notification messages as per
> some configured periodicity.* For example, if periodicity = 20 min, in
> the above scenario this operator will output :
> state1 notification @ 1PM
> state1 notification @ 1.20PM
> state1 notification @ 1.40PM
>  ...
>
> *Now the main issue is that I want this to be driven by the watermark and
> not by transition events received from upstream. *Meaning I would like to
> see notification events as soon as the watermark crosses their timestamps;
> *not* when the next transition event arrives at the operator (which could
> be hours later, as above).
>
> My first solution, using a keyedProcessFunction and timers did not work as
> expected because the order in which transition events arrived at this
> operator was non-deterministic. To elaborate, assume a
> setAutoWatermarkInterval of 10 second.
> If we get transition events :
> state1 @ 1sec
> state2 @ 3 sec
> state3 @ 5 sec
> state1 @ 8 sec
> the order in which these events arrived at my keyedProcessFunction was not
> fixed. To solve this, these messages need to be sorted on event time, which
> led me to my second solution.
>
> My second solution, using a EventTimeTumblingWindow with size =
> setAutoWatermarkInterval, also does not work. I sorted accumulated events
> in the window and applied notification-generation logic on them in order.
> However, I assumed that windows are created even if there are no elements.
> Since this is not the case, this solution generates notifications only when
> the next state tranisition message arrives, which could be hours later.
>
> Does anyone have any suggestions on how I can implement this?
> Thanks!
>
>
>
>
>


Re: "Fill in" notification messages based on event time watermark

2020-04-27 Thread Piotr Nowojski
Hi,

I’m not sure, but I don’t think there is an existing window that would do 
exactly what you want. I would suggest to go back to the `keyedProcessFunction` 
(or a custom operator?), and have a MapState 
currentStates field. Your key would be for example a timestamp of the beginning 
of your window. Value would be the latest state in this time window, annotated 
with a timestamp when this state was record.

On each element:

1. you determine the window’s begin ts (key of the map)
2. If it’s first element, register an event time timer to publish results for 
that window’s end TS
3. look into the `currentStates` if it should be modified (if your new element 
is newer or first value for the given key)

On even time timer firing
1. output the state matching to this timer
2. Check if there is a (more recent) value for next window, and if not:
 
3. copy the value to next window
4. Register a timer for this window to fire

5. Cleanup currentState and remove value for the no longed needed key.

I hope this helps

Piotrek 

> On 27 Apr 2020, at 12:01, Manas Kale  wrote:
> 
> Hi,
> I have an upstream operator that outputs device state transition messages 
> with event timestamps. Meaning it only emits output when a transition takes 
> place.
> For example, 
> state1 @ 1 PM
> state2 @ 2 PM
> and so on. 
> 
> Using a downstream operator, I want to emit notification messages as per some 
> configured periodicity. For example, if periodicity = 20 min, in the above 
> scenario this operator will output : 
> state1 notification @ 1PM
> state1 notification @ 1.20PM
> state1 notification @ 1.40PM
>  ...
> 
> Now the main issue is that I want this to be driven by the watermark and not 
> by transition events received from upstream. Meaning I would like to see 
> notification events as soon as the watermark crosses their timestamps; not 
> when the next transition event arrives at the operator (which could be hours 
> later, as above).
> 
> My first solution, using a keyedProcessFunction and timers did not work as 
> expected because the order in which transition events arrived at this 
> operator was non-deterministic. To elaborate, assume a 
> setAutoWatermarkInterval of 10 second.
> If we get transition events :
> state1 @ 1sec
> state2 @ 3 sec
> state3 @ 5 sec
> state1 @ 8 sec
> the order in which these events arrived at my keyedProcessFunction was not 
> fixed. To solve this, these messages need to be sorted on event time, which 
> led me to my second solution.
> 
> My second solution, using a EventTimeTumblingWindow with size = 
> setAutoWatermarkInterval, also does not work. I sorted accumulated events in 
> the window and applied notification-generation logic on them in order. 
> However, I assumed that windows are created even if there are no elements. 
> Since this is not the case, this solution generates notifications only when 
> the next state tranisition message arrives, which could be hours later.
> 
> Does anyone have any suggestions on how I can implement this?
> Thanks!
> 
> 
> 



Re: Task Assignment

2020-04-27 Thread Piotr Nowojski
Hi Navneeth,

But what’s the problem with using `keyBy(…)`? If you have a set of keys that 
you want to process together, in other words they are are basically equal from 
the `keyBy(…)` perspective, why can’t you use this in your `KeySelector`?

Maybe to make it clear, you can think about this in two steps. You have the 
sets of keys that you want to processed together, S_1, S_2, …, S_n. Each S_i 
can contain multiple keys. The steps would:
1. You could create an artificial field, index of the set, and add it to your 
record by using some mapping function.
2. You can keyBy records using this index
After this, operator after keyBy will be receiving only keys from one of the 
sets.

(Those two operations could be done also as a single step inside `KeySelector`)

Piotrek  

> On 27 Apr 2020, at 09:28, Marta Paes Moreira  > wrote:
> 
> Sorry — I didn't understand you were dealing with multiple keys. 
> 
> In that case, I'd recommend you read about key-group assignment [1] and check 
> the KeyGroupRangeAssignment class [2]. 
> 
> Key-groups are assigned to parallel tasks as ranges before the job is started 
> — this is also a well-defined behaviour in Flink, with implications in state 
> reassignment on rescaling. I'm afraid that if you try to hardwire this 
> behaviour into your code, the job might not be transparently rescalable 
> anymore.
> 
> [1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html 
> 
> [2] 
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
>  
> 
>  
> 
> On Fri, Apr 24, 2020 at 7:10 AM Navneeth Krishnan  > wrote:
> Hi Marta,
> 
> Thanks for you response. What I'm looking for is something like data 
> localization. If I have one TM which is processing a set of keys, I want to 
> ensure all keys of the same type goes to the same TM rather than using 
> hashing to find the downstream slot. I could use a common key to do this but 
> I would have to parallelize as much as possible since the number of incoming 
> messages is too large to narrow down to a single key and processing it.
> 
> Thanks
> 
> On Thu, Apr 23, 2020 at 2:02 AM Marta Paes Moreira  > wrote:
> Hi, Navneeth.
> 
> If you key your stream using stream.keyBy(…), this will logically split your 
> input and all the records with the same key will be processed in the same 
> operator instance. This is the default behavior in Flink for keyed streams 
> and transparently handled.
> 
> You can read more about it in the documentation [1].
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state-and-operator-state
>  
> 
> On Thu, Apr 23, 2020 at 7:44 AM Navneeth Krishnan  > wrote:
> Hi All,
> 
> Is there a way for an upstream operator to know how the downstream operator 
> tasks are assigned? Basically I want to group my messages to be processed on 
> slots in the same node based on some key.
> 
> Thanks



Re: upgrade flink from 1.9.1 to 1.10.0 on EMR

2020-04-27 Thread aj
Hello Yang,
My Hadoop version is Hadoop 3.2.1-amzn-0
and I have put in flink/lib.   flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

then I am getting below error :

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1587983834922_0002/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "main" java.lang.IllegalArgumentException: Invalid
rule: /L
  RULE:[2:$1@$0](.*@)s/@.*///L
  DEFAULT
at
org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321)
at
org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386)
at
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247)
at
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
at
org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
at
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
at
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
at
org.apache.flink.yarn.entrypoint.YarnEntrypointUtils.logYarnEnvironmentInformation(YarnEntrypointUtils.java:136)
at
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:109)


if I remove the  flink-shaded-hadoop-2-uber-2.8.3-10.0.jar  from lib  then
i get below error:

2020-04-27 16:59:37,293 INFO  org.apache.flink.client.cli.CliFrontend
-  Classpath:
/usr/lib/flink/lib/flink-table-blink_2.11-1.10.0.jar:/usr/lib/flink/lib/flink-table_2.11-1.10.0.jar:/usr/lib/flink/lib/log4j-1.2.17.jar:/usr/lib/flink/lib/slf4j-log4j12-1.7.15.jar:/usr/lib/flink/lib/flink-dist_2.11-1.10.0.jar::/etc/hadoop/conf:/etc/hadoop/conf
2020-04-27 16:59:37,293 INFO  org.apache.flink.client.cli.CliFrontend
-

2020-04-27 16:59:37,300 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.size, 1024m
2020-04-27 16:59:37,300 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.memory.process.size, 1568m
2020-04-27 16:59:37,300 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2020-04-27 16:59:37,300 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 1
2020-04-27 16:59:37,300 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: env.yarn.conf.dir, /etc/hadoop/conf
2020-04-27 16:59:37,300 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
2020-04-27 16:59:37,301 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.execution.failover-strategy, region
2020-04-27 16:59:37,301 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: classloader.resolve-order, parent-first
2020-04-27 16:59:37,301 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: s3.access-key, AKIA52DD5QA5FC7HPKXG
2020-04-27 16:59:37,301 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: s3.secret-key, **
2020-04-27 16:59:37,305 WARN  org.apache.flink.client.cli.CliFrontend
- Could not load CLI class
org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError:
org/apache/hadoop/yarn/exceptions/YarnException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1076)
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1030)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:957)
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.exceptions.YarnException
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.ja

Re: Updating Closure Variables

2020-04-27 Thread Yun Gao
 Hi Senthil,
 I think you are right that you cannot update closure variables 
directly and expect them to show up at the workers.

 If the variable values are read from S3 files, I think currently you 
will need to define a source explicitly to read the latest value of the file. 
Whether to use BroadcastedStream should depends on how you want to access the 
set of string: if you want to broadcast the same strings to all the tasks, then 
broadcast stream is the solution and if you want to distribute the set of 
strings in other methods, you could also use more generic connect streams like: 
 streamA.connect(streamB.keyBy()).process(xx). [1]

Best,
 Yun

 [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/#datastream-transformations



--
From:Senthil Kumar 
Send Time:2020 Apr. 27 (Mon.) 21:51
To:user@flink.apache.org 
Subject:Updating Closure Variables

Hello Flink Community!

We have a flink streaming application with a particular use case where a 
closure variable Set is used in a filter function.

Currently, the variable is set at startup time.

It’s populated from an S3 location, where several files exist (we consume the 
one with the last updated timestamp).

Is it possible to periodically update (say once every 24 hours) this closure 
variable?

My initial research indicates that we cannot update closure variables and 
expect them to show up at the workers.

There seems to be something called BrodcastStream in Flink. 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Is that the right approach? I would like some kind of a confirmation before I 
go deeper into it.

cheers
Kumar



RE: History Server Not Showing Any Jobs - File Not Found?

2020-04-27 Thread Hailu, Andreas
bash-4.1$ ls -l /local/scratch/flink_historyserver_tmpdir/
total 8
drwxrwxr-x 3 p2epdlsuat p2epdlsuat 4096 Apr 21 10:43 
flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9
drwxrwxr-x 3 p2epdlsuat p2epdlsuat 4096 Apr 21 10:22 
flink-web-history-95b3f928-c60f-4351-9926-766c6ad3ee76

There are just two directories in here. I don't see cache directories from my 
attempts today, which is interesting. Looking a little deeper into them:

bash-4.1$ ls -lr 
/local/scratch/flink_historyserver_tmpdir/flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9
total 1756
drwxrwxr-x 2 p2epdlsuat p2epdlsuat 1789952 Apr 21 10:44 jobs
bash-4.1$ ls -lr 
/local/scratch/flink_historyserver_tmpdir/flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9/jobs
total 0
-rw-rw-r-- 1 p2epdlsuat p2epdlsuat 0 Apr 21 10:43 overview.json

There are indeed archives already in HDFS - I've included some in my initial 
mail, but here they are again just for reference:
-bash-4.1$ hdfs dfs -ls /user/p2epda/lake/delp_qa/flink_hs
Found 44282 items
-rw-r-   3 delp datalake_admin_dev  50569 2020-03-21 23:17 
/user/p2epda/lake/delp_qa/flink_hs/000144dba9dc0f235768a46b2f26e936
-rw-r-   3 delp datalake_admin_dev  49578 2020-03-03 08:45 
/user/p2epda/lake/delp_qa/flink_hs/000347625d8128ee3fd0b672018e38a5
-rw-r-   3 delp datalake_admin_dev  50842 2020-03-24 15:19 
/user/p2epda/lake/delp_qa/flink_hs/0004be6ce01ba9677d1eb619ad0fa757
...


// ah

From: Chesnay Schepler 
Sent: Monday, April 27, 2020 10:28 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?

If historyserver.web.tmpdir is not set then java.io.tmpdir is used, so that 
should be fine.

What are the contents of /local/scratch/flink_historyserver_tmpdir?
I assume there are already archives in HDFS?

On 27/04/2020 16:02, Hailu, Andreas wrote:
My machine's /tmp directory is not large enough to support the archived files, 
so I changed my java.io.tmpdir to be in some other location which is 
significantly larger. I hadn't set anything for historyserver.web.tmpdir, so I 
suspect it was still pointing at /tmp. I just tried setting 
historyserver.web.tmpdir to the same location as my java.io.tmpdir location, 
but I'm afraid I'm still seeing the following issue:

2020-04-27 09:37:42,904 [nioEventLoopGroup-3-4] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/overview.json from classloader
2020-04-27 09:37:42,906 [nioEventLoopGroup-3-6] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/jobs/overview.json from classloader

flink-conf.yaml for reference:
jobmanager.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/
historyserver.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/
historyserver.web.tmpdir: /local/scratch/flink_historyserver_tmpdir/

Did you have anything else in mind when you said pointing somewhere funny?

// ah

From: Chesnay Schepler 
Sent: Monday, April 27, 2020 5:56 AM
To: Hailu, Andreas [Engineering] 
; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?


overview.json is a generated file that is placed in the local directory 
controlled by historyserver.web.tmpdir.

Have you configured this option to point to some non-local filesystem? (Or if 
not, is the java.io.tmpdir property pointing somewhere funny?)
On 24/04/2020 18:24, Hailu, Andreas wrote:
I'm having a further look at the code in HistoryServerStaticFileServerHandler - 
is there an assumption about where overview.json is supposed to be located?

// ah

From: Hailu, Andreas [Engineering]
Sent: Wednesday, April 22, 2020 1:32 PM
To: 'Chesnay Schepler' ; Hailu, 
Andreas [Engineering] 
; 
user@flink.apache.org
Subject: RE: History Server Not Showing Any Jobs - File Not Found?

Hi Chesnay, thanks for responding. We're using Flink 1.9.1. I enabled DEBUG 
level logging and this is something relevant I see:

2020-04-22 13:25:52,566 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
DFSInputStream - Connecting to datanode 10.79.252.101:1019
2020-04-22 13:25:52,567 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
SaslDataTransferClient - SASL encryption trust check: localHostTrusted = false, 
remoteHostTrusted = false
2020-04-22 13:25:52,567 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
SaslDataTransferClient - SASL client skipping handshake in secured 
configuration with privileged port for addr = /10.79.252.101, datanodeId = 
DatanodeI
nfoWithStorage[10.79.252.101:1019,DS-7f4ec55d-7c5f-4a0e-b817-d9e635480b21,DISK]
2020-04-22 13:25:52,571 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
DFSInputStream - DFSInputStream has been closed already
2020-04-22 13:25:52,573 [nioEventLoopGroup-3-6] DEBUG 
HistoryServerStaticFileServer

Re: Cannot map nested Tuple fields to table columns

2020-04-27 Thread Dawid Wysakowicz
What I meant by "Unfortunately you can not reorder the fields that way."
is that

   tableEnv.fromDataStream(input, “name, age, height");

uses the so-called referenceByPosition mode. It will name the f0 field
-> name, the f1 -> age and f2 -> height.


If it wasn't for the bug you could reorder and rename at the same time:

   tableEnv.fromDataStream(input, “f1 as name, f2 as age, f0 as height")
// it reorders the fields of the pojo to the order f1,f2,f0 and give
them aliases

With a fix it should be possible yes.

Best,

Dawid


On 27/04/2020 17:24, Gyula Fóra wrote:
> Hi Dawid,
>
> Thanks for the clarification on this issue and I agree that there is
> too much going on with these conversions already.
>
> What do you mean by "Unfortunately you can not reorder the fields that
> way." ?
> I can reorder POJO fields even after aliasing and also tuple fields
> (f1, f0) so I assume reordering will still work if tuple and row
> aliasing is fixed.
>
> I will open a JIRA for this!
>
> Thanks!
> Gyula
>
> On Mon, Apr 27, 2020 at 4:58 PM Dawid Wysakowicz
> mailto:dwysakow...@apache.org>> wrote:
>
> Hi Gyula,
>
> I think you are hitting a bug with the naming/aliasing of the
> fields of a Tuple. The bug is in the
> org.apache.flink.table.typeutils.FieldInfoUtils#isReferenceByPosition
> method. As it does not work correctly for aliases. Would you mind
> creating an issue for it?
>
> You should be able to alias the fields as follows:
>
> tableEnv.fromDataStream(input, “name, age, height");
>
> Unfortunately you can not reorder the fields that way.
>
> If you want to flatten/extract nested fields you should be able to
> do that in a subsequent operation. The method fromDataStream is
> supposed to register the entire DataStream as a Table and it does
> not support projections etc.
>
> tableEnv.fromDataStream(input, “name, age, height")
>
> .select("name.f0 as nameF0, age.flatten, ...");
>
> Side note. In my opinion this method (fromDataStream(DataStream,
> Expression/String... fields)) has already too many
> responsibilities and is hard to understand. (You can reorder
> fields, rename fields without alias, rename fields with an alias,
> alias works differently depending of the available fields or type
> etc.). In the long term I'd prefer to come up with a better way of
> creating a Table out of a DataStream.
>
> BTW The way we can fix the renaming + reordering is by changing
> the method I mentioned:
>
>     public static boolean isReferenceByPosition(CompositeType
> ct, Expression[] fields) {
>         if (!(ct instanceof TupleTypeInfoBase)) {
>             return false;
>         }
>
>         List inputNames = Arrays.asList(ct.getFieldNames());
>
>         // Use the by-position mode if no of the fields exists in
> the input.
>         // This prevents confusing cases like ('f2, 'f0, 'myName)
> for a Tuple3 where fields are renamed
>         // by position but the user might assume reordering
> instead of renaming.
>         return Arrays.stream(fields).allMatch(f -> {
> *            if (f instanceof UnresolvedCallExpression &&**
> **                    ((UnresolvedCallExpression)
> f).getFunctionDefinition() == BuiltInFunctionDefinitions.AS &&**
> **                    f.getChildren().get(0) instanceof
> UnresolvedReferenceExpression) {**
> **                return false;**
> **            }*
>
>             if (f instanceof UnresolvedReferenceExpression) {
>                 return
> !inputNames.contains(((UnresolvedReferenceExpression) f).getName());
>             }
>
>             return true;
>         });
>     }
>
>
> Best,
>
> Dawid
>
>
> On 27/04/2020 15:57, Gyula Fóra wrote:
>> Hi Leonard,
>>
>> The tuple fields can also be referenced as their POJO names (f0,
>> f1), they can be reordered similar to pojo fields, however you
>> cannot alias them. (If you look at the link I have sent that
>> shows how it is supposed to work but it throws an exception when
>> I try it)
>> Also what I am trying to do at the end is to flatten a nested tuple:
>>
>> Tuple2> -> into 3 columns, lets
>> say name, age, height
>>
>> Normally I would write this: tableEnv.fromDataStream(input, “f0
>> as name, f1.f0 as age, f1.f1 as height");
>> However this doesnt work and there seem to be no way to assign
>> names to the nested tuple columns anyways.
>>
>> For Pojo aliasing works  but still I cannot find a way to unnest
>> a nested object:
>>
>> public static class Person {
>>   public String name;
>>   public public Tuple2 details;
>> }
>>
>> tableEnv.fromDataStream(persons, "name, details.f0 as age,
>> details.f1 as height")
>>
>> this leads to an error: 
>> Field reference expression or alias on field expression expected.
>>
>> Al

Re: Re: How to disable the state behind `COUNT` sql?

2020-04-27 Thread Benchao Li
Hi izual,

IMO, implementing your own COUNT/SUM UDAF doesn't solve the problem.
The state is not managed in UDAF, it's managed by aggregation operator, and
your UDAF's aggregator will be handled by operator using state.

izual  于2020年4月27日周一 下午11:21写道:

> Thanks, Benchao.
>
> Maybe change the dimension table will work, but this changes a lot,
> include `size/count` is not the column of one dim table.
> I notice that user can define Aggregate Functions[1],  but this page also
> said:
> > Accumulators are automatically backup-ed by Flink’s checkpointing
> mechanism and restored
> So is it right to implement my own COUNT/SUM UDF?
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregation-functions
>
>
>
>
>
>
> At 2020-04-27 17:32:14, "Benchao Li"  wrote:
>
> Hi,
>
> There is indeed a state for the aggregation result, however we cannot
> disable it, it's by design.
> StreamQueryConfig.maxIdleStateRetentionTime can control how long the state
> will be kept.
> If you can ensure the time gap between two records of the same id larger
> than, for example
> 1 min, then setting retention time to 1min can resolve your issue.
> If not, maybe you need to change your dimension table, making it return
> the count directly instead
> of return the details.
>
> izual  于2020年4月27日周一 下午5:06写道:
>
>> I implements my DimTable by extends `LookupTableSource`[1], which stores
>> data like:
>>
>> id=1 -> (SH, BJ, SD)
>>
>> id=2 -> (...)
>>
>> and then extends `TableFunction` to return the value corresponding to the
>> lookup keys,and maybe return multi rows, for example, when lookupkeys is
>> id=1, then in the `TableFunction.eval`
>>
>> ```
>>
>> collect('SH')
>>
>> collect('BJ')
>>
>> collect('SD')
>>
>> ```
>>
>>
>> Now I want to get the region'count by id, which is from the tblEvent.id,
>> sql is :
>>
>>
>> SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR
>> SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY
>> tblEvent.id
>>
>>
>> I expect the result of COUNT is always 3 for id = 1, no matter the id=1
>> appears how many times.
>>
>> but the actual result is : 3, 6, 9, ...
>>
>>
>> I think this is bcz the state mechanism behind COUNT, how to turn this
>> off?
>>
>> Or what's the correct use for this?
>> StreamQueryConfig.maxIdleStateRetentionTime or something?
>>
>>
>> The reason not using state in flink:
>> http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups
>>
>>
>>
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>
>
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Cannot map nested Tuple fields to table columns

2020-04-27 Thread Gyula Fóra
Hi Dawid,

Thanks for the clarification on this issue and I agree that there is too
much going on with these conversions already.

What do you mean by "Unfortunately you can not reorder the fields that
way." ?
I can reorder POJO fields even after aliasing and also tuple fields (f1,
f0) so I assume reordering will still work if tuple and row aliasing is
fixed.

I will open a JIRA for this!

Thanks!
Gyula

On Mon, Apr 27, 2020 at 4:58 PM Dawid Wysakowicz 
wrote:

> Hi Gyula,
>
> I think you are hitting a bug with the naming/aliasing of the fields of a
> Tuple. The bug is in the
> org.apache.flink.table.typeutils.FieldInfoUtils#isReferenceByPosition
> method. As it does not work correctly for aliases. Would you mind creating
> an issue for it?
>
> You should be able to alias the fields as follows:
>
> tableEnv.fromDataStream(input, “name, age, height");
>
> Unfortunately you can not reorder the fields that way.
>
> If you want to flatten/extract nested fields you should be able to do that
> in a subsequent operation. The method fromDataStream is supposed to
> register the entire DataStream as a Table and it does not support
> projections etc.
>
> tableEnv.fromDataStream(input, “name, age, height")
>
> .select("name.f0 as nameF0, age.flatten, ...");
>
> Side note. In my opinion this method (fromDataStream(DataStream,
> Expression/String... fields)) has already too many responsibilities and is
> hard to understand. (You can reorder fields, rename fields without alias,
> rename fields with an alias, alias works differently depending of the
> available fields or type etc.). In the long term I'd prefer to come up with
> a better way of creating a Table out of a DataStream.
>
> BTW The way we can fix the renaming + reordering is by changing the method
> I mentioned:
>
> public static boolean isReferenceByPosition(CompositeType ct,
> Expression[] fields) {
> if (!(ct instanceof TupleTypeInfoBase)) {
> return false;
> }
>
> List inputNames = Arrays.asList(ct.getFieldNames());
>
> // Use the by-position mode if no of the fields exists in the
> input.
> // This prevents confusing cases like ('f2, 'f0, 'myName) for a
> Tuple3 where fields are renamed
> // by position but the user might assume reordering instead of
> renaming.
> return Arrays.stream(fields).allMatch(f -> {
> *if (f instanceof UnresolvedCallExpression &&*
> *((UnresolvedCallExpression)
> f).getFunctionDefinition() == BuiltInFunctionDefinitions.AS &&*
> *f.getChildren().get(0) instanceof
> UnresolvedReferenceExpression) {*
> *return false;*
> *}*
>
> if (f instanceof UnresolvedReferenceExpression) {
> return
> !inputNames.contains(((UnresolvedReferenceExpression) f).getName());
> }
>
> return true;
> });
> }
>
>
> Best,
>
> Dawid
>
>
> On 27/04/2020 15:57, Gyula Fóra wrote:
>
> Hi Leonard,
>
> The tuple fields can also be referenced as their POJO names (f0, f1), they
> can be reordered similar to pojo fields, however you cannot alias them. (If
> you look at the link I have sent that shows how it is supposed to work but
> it throws an exception when I try it)
> Also what I am trying to do at the end is to flatten a nested tuple:
>
> Tuple2> -> into 3 columns, lets say name,
> age, height
>
> Normally I would write this: tableEnv.fromDataStream(input, “f0 as name,
> f1.f0 as age, f1.f1 as height");
> However this doesnt work and there seem to be no way to assign names to
> the nested tuple columns anyways.
>
> For Pojo aliasing works  but still I cannot find a way to unnest a nested
> object:
>
> public static class Person {
>   public String name;
>   public public Tuple2 details;
> }
>
> tableEnv.fromDataStream(persons, "name, details.f0 as age, details.f1 as
> height")
>
> this leads to an error:
> Field reference expression or alias on field expression expected.
>
> Aliasing fields also doesn't work when converting from Row stream even if
> the column names are provided in the type info.
>
> Cheers,
> Gyula
>
> On Mon, Apr 27, 2020 at 3:33 PM Leonard Xu  wrote:
>
>> Hi,  gyula.fora
>>
>> If you’re trying convert Table from a Tuple DataStream, Alias the filed
>> by `as` expression is no supported yet,
>> because all fields are referenced by position in this point. You can
>> simply alias like following syntax:
>> ```
>> tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)), “name, age");
>> ```
>> This should satisfy  your purpose. And back to the 1.10 docs, If you are
>> converting Table from a
>> POJO(assuming the POJO person has two fields name and age) DataStream,
>> Alias the filed by `as` is supported
>> because this point all fields are referenced by name, like:
>> ```
>> tableEnv.fromDataStream(env.fromElements(new Person(“foo", 12)), “age as
>> age_alias, name as user_name,");
>> ```
>>
>>
>> Best,
>> Leonard, Xu
>
>


Re:Re: How to disable the state behind `COUNT` sql?

2020-04-27 Thread izual
Thanks, Benchao.


Maybe change the dimension table will work, but this changes a lot, include 
`size/count` is not the column of one dim table.
I notice that user can define Aggregate Functions[1],  but this page also said:
> Accumulators are automatically backup-ed by Flink’s checkpointing mechanism 
> and restored
So is it right to implement my own COUNT/SUM UDF?


[1]. 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregation-functions


















At 2020-04-27 17:32:14, "Benchao Li"  wrote:

Hi,


There is indeed a state for the aggregation result, however we cannot disable 
it, it's by design.
StreamQueryConfig.maxIdleStateRetentionTime can control how long the state will 
be kept.
If you can ensure the time gap between two records of the same id larger than, 
for example 
1 min, then setting retention time to 1min can resolve your issue.
If not, maybe you need to change your dimension table, making it return the 
count directly instead 
of return the details.


izual  于2020年4月27日周一 下午5:06写道:


I implements my DimTable by extends `LookupTableSource`[1], which stores data 
like:

id=1 -> (SH, BJ, SD)

id=2 -> (...)

and then extends `TableFunction` to return the value corresponding to the 
lookup keys,and maybe return multi rows, for example, when lookupkeys is id=1, 
then in the `TableFunction.eval`

```

collect('SH')

collect('BJ')

collect('SD')

```




Now I want to get the region'count by id, which is from the tblEvent.id, sql is 
:




SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM 
AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id




I expect the result of COUNT is always 3 for id = 1, no matter the id=1 appears 
how many times.

but the actual result is : 3, 6, 9, ...




I think this is bcz the state mechanism behind COUNT, how to turn this off?

Or what's the correct use for this? StreamQueryConfig.maxIdleStateRetentionTime 
or something?




The reason not using state in flink: 
http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E

[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups





 





--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: Cannot map nested Tuple fields to table columns

2020-04-27 Thread Dawid Wysakowicz
Hi Gyula,

I think you are hitting a bug with the naming/aliasing of the fields of
a Tuple. The bug is in the
org.apache.flink.table.typeutils.FieldInfoUtils#isReferenceByPosition
method. As it does not work correctly for aliases. Would you mind
creating an issue for it?

You should be able to alias the fields as follows:

tableEnv.fromDataStream(input, “name, age, height");

Unfortunately you can not reorder the fields that way.

If you want to flatten/extract nested fields you should be able to do
that in a subsequent operation. The method fromDataStream is supposed to
register the entire DataStream as a Table and it does not support
projections etc.

tableEnv.fromDataStream(input, “name, age, height")

.select("name.f0 as nameF0, age.flatten, ...");

Side note. In my opinion this method (fromDataStream(DataStream,
Expression/String... fields)) has already too many responsibilities and
is hard to understand. (You can reorder fields, rename fields without
alias, rename fields with an alias, alias works differently depending of
the available fields or type etc.). In the long term I'd prefer to come
up with a better way of creating a Table out of a DataStream.

BTW The way we can fix the renaming + reordering is by changing the
method I mentioned:

    public static boolean isReferenceByPosition(CompositeType ct,
Expression[] fields) {
        if (!(ct instanceof TupleTypeInfoBase)) {
            return false;
        }

        List inputNames = Arrays.asList(ct.getFieldNames());

        // Use the by-position mode if no of the fields exists in the input.
        // This prevents confusing cases like ('f2, 'f0, 'myName) for a
Tuple3 where fields are renamed
        // by position but the user might assume reordering instead of
renaming.
        return Arrays.stream(fields).allMatch(f -> {
*            if (f instanceof UnresolvedCallExpression &&**
**                    ((UnresolvedCallExpression)
f).getFunctionDefinition() == BuiltInFunctionDefinitions.AS &&**
**                    f.getChildren().get(0) instanceof
UnresolvedReferenceExpression) {**
**                return false;**
**            }*

            if (f instanceof UnresolvedReferenceExpression) {
                return
!inputNames.contains(((UnresolvedReferenceExpression) f).getName());
            }

            return true;
        });
    }


Best,

Dawid


On 27/04/2020 15:57, Gyula Fóra wrote:
> Hi Leonard,
>
> The tuple fields can also be referenced as their POJO names (f0, f1),
> they can be reordered similar to pojo fields, however you cannot alias
> them. (If you look at the link I have sent that shows how it is
> supposed to work but it throws an exception when I try it)
> Also what I am trying to do at the end is to flatten a nested tuple:
>
> Tuple2> -> into 3 columns, lets say
> name, age, height
>
> Normally I would write this: tableEnv.fromDataStream(input, “f0 as
> name, f1.f0 as age, f1.f1 as height");
> However this doesnt work and there seem to be no way to assign names
> to the nested tuple columns anyways.
>
> For Pojo aliasing works  but still I cannot find a way to unnest a
> nested object:
>
> public static class Person {
>   public String name;
>   public public Tuple2 details;
> }
>
> tableEnv.fromDataStream(persons, "name, details.f0 as age, details.f1
> as height")
>
> this leads to an error: 
> Field reference expression or alias on field expression expected.
>
> Aliasing fields also doesn't work when converting from Row stream even
> if the column names are provided in the type info.
>
> Cheers,
> Gyula
>
> On Mon, Apr 27, 2020 at 3:33 PM Leonard Xu  > wrote:
>
> Hi,  gyula.fora
>
> If you’re trying convert Table from a Tuple DataStream, Alias the
> filed by `as` expression is no supported yet,
> because all fields are referenced by position in this point. You
> can simply alias like following syntax:
> ```
> tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)),
> “name, age");
> ```
> This should satisfy  your purpose. And back to the 1.10 docs, If
> you are converting Table from a
> POJO(assuming the POJO person has two fields name and age)
> DataStream, Alias the filed by `as` is supported
> because this point all fields are referenced by name, like:
> ```
> tableEnv.fromDataStream(env.fromElements(new Person(“foo", 12)),
> “age as age_alias, name as user_name,");
> ```
>
>
> Best,
> Leonard, Xu
>


signature.asc
Description: OpenPGP digital signature


Re: History Server Not Showing Any Jobs - File Not Found?

2020-04-27 Thread Chesnay Schepler
If historyserver.web.tmpdir is not set then java.io.tmpdir is used, so 
that should be fine.


What are the contents of /local/scratch/flink_historyserver_tmpdir?
I assume there are already archives in HDFS?

On 27/04/2020 16:02, Hailu, Andreas wrote:


My machine’s /tmp directory is not large enough to support the 
archived files, so I changed my java.io.tmpdir to be in some other 
location which is significantly larger. I hadn’t set anything for 
historyserver.web.tmpdir, so I suspect it was still pointing at /tmp. 
I just tried setting historyserver.web.tmpdir to the same location as 
my java.io.tmpdir location, but I’m afraid I’m still seeing the 
following issue:


2020-04-27 09:37:42,904 [nioEventLoopGroup-3-4] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/overview.json from classloader


2020-04-27 09:37:42,906 [nioEventLoopGroup-3-6] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/jobs/overview.json from classloader


flink-conf.yaml for reference:

jobmanager.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/

historyserver.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/

historyserver.web.tmpdir: /local/scratch/flink_historyserver_tmpdir/

Did you have anything else in mind when you said pointing somewhere funny?

*// *ah**

*From:*Chesnay Schepler 
*Sent:* Monday, April 27, 2020 5:56 AM
*To:* Hailu, Andreas [Engineering] ; 
user@flink.apache.org

*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

overview.json is a generated file that is placed in the local 
directory controlled by /historyserver.web.tmpdir/.


Have you configured this option to point to some non-local filesystem? 
(Or if not, is the java.io.tmpdir property pointing somewhere funny?)


On 24/04/2020 18:24, Hailu, Andreas wrote:

I’m having a further look at the code in
HistoryServerStaticFileServerHandler - is there an assumption
about where overview.json is supposed to be located?

*// *ah

*From:*Hailu, Andreas [Engineering]
*Sent:* Wednesday, April 22, 2020 1:32 PM
*To:* 'Chesnay Schepler' 
; Hailu, Andreas [Engineering]

; user@flink.apache.org

*Subject:* RE: History Server Not Showing Any Jobs - File Not Found?

Hi Chesnay, thanks for responding. We’re using Flink 1.9.1. I
enabled DEBUG level logging and this is something relevant I see:

2020-04-22 13:25:52,566
[Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG DFSInputStream
- Connecting to datanode 10.79.252.101:1019

2020-04-22 13:25:52,567
[Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG
SaslDataTransferClient - SASL encryption trust check:
localHostTrusted = false, remoteHostTrusted = false

2020-04-22 13:25:52,567
[Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG
SaslDataTransferClient - SASL client skipping handshake in secured
configuration with privileged port for addr = /10.79.252.101,
datanodeId = DatanodeI


nfoWithStorage[10.79.252.101:1019,DS-7f4ec55d-7c5f-4a0e-b817-d9e635480b21,DISK]

*2020-04-22 13:25:52,571
[Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG DFSInputStream
- DFSInputStream has been closed already*

*2020-04-22 13:25:52,573 [nioEventLoopGroup-3-6] DEBUG
HistoryServerStaticFileServerHandler - Unable to load requested
file /jobs/overview.json from classloader*

2020-04-22 13:25:52,576 [IPC Parameter Sending Thread #0] DEBUG
Client$Connection$3 - IPC Client (1578587450) connection to
d279536-002.dc.gs.com/10.59.61.87:8020 from d...@gs.com
 sending #1391

Aside from that, it looks like a lot of logging around datanodes
and block location metadata. Did I miss something in my classpath,
perhaps? If so, do you have a suggestion on what I could try?

*// *ah

*From:*Chesnay Schepler mailto:ches...@apache.org>>
*Sent:* Wednesday, April 22, 2020 2:16 AM
*To:* Hailu, Andreas [Engineering] mailto:andreas.ha...@ny.email.gs.com>>; user@flink.apache.org

*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

Which Flink version are you using?

Have you checked the history server logs after enabling debug logging?

On 21/04/2020 17:16, Hailu, Andreas [Engineering] wrote:

Hi,

I’m trying to set up the History Server, but none of my
applications are showing up in the Web UI. Looking at the
console, I see that all of the calls to /overview return the
following 404 response: {"errors":["File not found."]}.

I’ve set up my configuration as follows:

JobManager Archive directory:

*jobmanager.archive.fs.dir*:
hdfs:///user/p2epda/lake/delp_qa/flink_hs/

-bash-4.1$ hdfs dfs -ls /user/p2epda/lake/delp_qa/flink_hs

Found 442

Re: Flink 1.9.2 why always checkpoint expired

2020-04-27 Thread Congxian Qiu
Hi
The image is not very clear.
For RocksDBStateBackend, do you enable incremental checkpoint?

Currently, checkpoint on TM side contains some steps:
1 barrier align
2 sync snapshot
3 async snapshot

For expired checkpoint, could you please check the tasks in the first
operator of the DAG to find out why it timed out.
- is there any backpressure? (affect barrier align)
- is the disk util/network util is high? (affect step 2&3)
- is the task thread is too busy? (this can lead to the barrier processed
sometime late)

you can enable the debug log to find out more info.

Best,
Congxian


qq <471237...@qq.com> 于2020年4月27日周一 下午12:34写道:

> Hi all,
>
> Why my flink checkpoint always expired, I used RocksDB checkpoint,
> and I can’t get any useful messages for this. Could you help me ? Thanks
> very much.
>
>
>
>


RE: History Server Not Showing Any Jobs - File Not Found?

2020-04-27 Thread Hailu, Andreas
My machine's /tmp directory is not large enough to support the archived files, 
so I changed my java.io.tmpdir to be in some other location which is 
significantly larger. I hadn't set anything for historyserver.web.tmpdir, so I 
suspect it was still pointing at /tmp. I just tried setting 
historyserver.web.tmpdir to the same location as my java.io.tmpdir location, 
but I'm afraid I'm still seeing the following issue:

2020-04-27 09:37:42,904 [nioEventLoopGroup-3-4] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/overview.json from classloader
2020-04-27 09:37:42,906 [nioEventLoopGroup-3-6] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/jobs/overview.json from classloader

flink-conf.yaml for reference:
jobmanager.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/
historyserver.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/
historyserver.web.tmpdir: /local/scratch/flink_historyserver_tmpdir/

Did you have anything else in mind when you said pointing somewhere funny?

// ah

From: Chesnay Schepler 
Sent: Monday, April 27, 2020 5:56 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?


overview.json is a generated file that is placed in the local directory 
controlled by historyserver.web.tmpdir.

Have you configured this option to point to some non-local filesystem? (Or if 
not, is the java.io.tmpdir property pointing somewhere funny?)
On 24/04/2020 18:24, Hailu, Andreas wrote:
I'm having a further look at the code in HistoryServerStaticFileServerHandler - 
is there an assumption about where overview.json is supposed to be located?

// ah

From: Hailu, Andreas [Engineering]
Sent: Wednesday, April 22, 2020 1:32 PM
To: 'Chesnay Schepler' ; Hailu, 
Andreas [Engineering] 
; 
user@flink.apache.org
Subject: RE: History Server Not Showing Any Jobs - File Not Found?

Hi Chesnay, thanks for responding. We're using Flink 1.9.1. I enabled DEBUG 
level logging and this is something relevant I see:

2020-04-22 13:25:52,566 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
DFSInputStream - Connecting to datanode 10.79.252.101:1019
2020-04-22 13:25:52,567 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
SaslDataTransferClient - SASL encryption trust check: localHostTrusted = false, 
remoteHostTrusted = false
2020-04-22 13:25:52,567 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
SaslDataTransferClient - SASL client skipping handshake in secured 
configuration with privileged port for addr = /10.79.252.101, datanodeId = 
DatanodeI
nfoWithStorage[10.79.252.101:1019,DS-7f4ec55d-7c5f-4a0e-b817-d9e635480b21,DISK]
2020-04-22 13:25:52,571 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
DFSInputStream - DFSInputStream has been closed already
2020-04-22 13:25:52,573 [nioEventLoopGroup-3-6] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/jobs/overview.json from classloader
2020-04-22 13:25:52,576 [IPC Parameter Sending Thread #0] DEBUG 
Client$Connection$3 - IPC Client (1578587450) connection to 
d279536-002.dc.gs.com/10.59.61.87:8020 from d...@gs.com 
sending #1391

Aside from that, it looks like a lot of logging around datanodes and block 
location metadata. Did I miss something in my classpath, perhaps? If so, do you 
have a suggestion on what I could try?

// ah

From: Chesnay Schepler mailto:ches...@apache.org>>
Sent: Wednesday, April 22, 2020 2:16 AM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?

Which Flink version are you using?
Have you checked the history server logs after enabling debug logging?

On 21/04/2020 17:16, Hailu, Andreas [Engineering] wrote:
Hi,

I'm trying to set up the History Server, but none of my applications are 
showing up in the Web UI. Looking at the console, I see that all of the calls 
to /overview return the following 404 response: {"errors":["File not found."]}.

I've set up my configuration as follows:

JobManager Archive directory:
jobmanager.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/
-bash-4.1$ hdfs dfs -ls /user/p2epda/lake/delp_qa/flink_hs
Found 44282 items
-rw-r-   3 delp datalake_admin_dev  50569 2020-03-21 23:17 
/user/p2epda/lake/delp_qa/flink_hs/000144dba9dc0f235768a46b2f26e936
-rw-r-   3 delp datalake_admin_dev  49578 2020-03-03 08:45 
/user/p2epda/lake/delp_qa/flink_hs/000347625d8128ee3fd0b672018e38a5
-rw-r-   3 delp datalake_admin_dev  50842 2020-03-24 15:19 
/user/p2epda/lake/delp_qa/flink_hs/0004be6ce01ba9677d1eb619ad0fa757
...
...

History Server will fetch the archived jobs from the same location:
historyserver.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/

So I'm able t

Re: Cannot map nested Tuple fields to table columns

2020-04-27 Thread Gyula Fóra
Hi Leonard,

The tuple fields can also be referenced as their POJO names (f0, f1), they
can be reordered similar to pojo fields, however you cannot alias them. (If
you look at the link I have sent that shows how it is supposed to work but
it throws an exception when I try it)
Also what I am trying to do at the end is to flatten a nested tuple:

Tuple2> -> into 3 columns, lets say name,
age, height

Normally I would write this: tableEnv.fromDataStream(input, “f0 as name,
f1.f0 as age, f1.f1 as height");
However this doesnt work and there seem to be no way to assign names to the
nested tuple columns anyways.

For Pojo aliasing works  but still I cannot find a way to unnest a nested
object:

public static class Person {
  public String name;
  public public Tuple2 details;
}

tableEnv.fromDataStream(persons, "name, details.f0 as age, details.f1 as
height")

this leads to an error:
Field reference expression or alias on field expression expected.

Aliasing fields also doesn't work when converting from Row stream even if
the column names are provided in the type info.

Cheers,
Gyula

On Mon, Apr 27, 2020 at 3:33 PM Leonard Xu  wrote:

> Hi,  gyula.fora
>
> If you’re trying convert Table from a Tuple DataStream, Alias the filed by
> `as` expression is no supported yet,
> because all fields are referenced by position in this point. You can
> simply alias like following syntax:
> ```
> tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)), “name, age");
> ```
> This should satisfy  your purpose. And back to the 1.10 docs, If you are
> converting Table from a
> POJO(assuming the POJO person has two fields name and age) DataStream,
> Alias the filed by `as` is supported
> because this point all fields are referenced by name, like:
> ```
> tableEnv.fromDataStream(env.fromElements(new Person(“foo", 12)), “age as
> age_alias, name as user_name,");
> ```
>
>
> Best,
> Leonard, Xu


Updating Closure Variables

2020-04-27 Thread Senthil Kumar
Hello Flink Community!

We have a flink streaming application with a particular use case where a 
closure variable Set is used in a filter function.

Currently, the variable is set at startup time.

It’s populated from an S3 location, where several files exist (we consume the 
one with the last updated timestamp).

Is it possible to periodically update (say once every 24 hours) this closure 
variable?

My initial research indicates that we cannot update closure variables and 
expect them to show up at the workers.

There seems to be something called BrodcastStream in Flink. 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Is that the right approach? I would like some kind of a confirmation before I 
go deeper into it.

cheers
Kumar


Re: Cannot map nested Tuple fields to table columns

2020-04-27 Thread Leonard Xu
Hi,  gyula.fora

If you’re trying convert Table from a Tuple DataStream, Alias the filed by `as` 
expression is no supported yet,
because all fields are referenced by position in this point. You can simply 
alias like following syntax:
```
tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)), “name, age");
```
This should satisfy  your purpose. And back to the 1.10 docs, If you are 
converting Table from a 
POJO(assuming the POJO person has two fields name and age) DataStream, Alias 
the filed by `as` is supported 
because this point all fields are referenced by name, like:
```
tableEnv.fromDataStream(env.fromElements(new Person(“foo", 12)), “age as 
age_alias, name as user_name,");
```


Best,
Leonard, Xu

Re: Streaming Job eventually begins failing during checkpointing

2020-04-27 Thread Yu Li
Sorry, just noticed this thread...

@Stephan I cannot remember the discussion but I think it's an interesting
topic, will find some time to consider it (unregister states).

@Eleanore Glad to know that Beam community has fixed it and thanks for the
reference.

Best Regards,
Yu


On Sun, 26 Apr 2020 at 03:10, Eleanore Jin  wrote:

> Hi All,
>
> I think the Beam Community fixed this issue:
> https://github.com/apache/beam/pull/11478
>
> Thanks!
> Eleanore
>
> On Thu, Apr 23, 2020 at 4:24 AM Stephan Ewen  wrote:
>
>> If something requires Beam to register a new state each time, then this
>> is tricky, because currently you cannot unregister states from Flink.
>>
>> @Yu @Yun I remember chatting about this (allowing to explicitly
>> unregister states so they get dropped from successive checkpoints) at some
>> point, but I could not find a jira ticket for this. Do you remember what
>> the status of that discussion is?
>>
>> On Thu, Apr 16, 2020 at 6:37 PM Stephen Patel  wrote:
>>
>>> I posted to the beam mailing list:
>>> https://lists.apache.org/thread.html/rb2ebfad16d85bcf668978b3defd442feda0903c20db29c323497a672%40%3Cuser.beam.apache.org%3E
>>>
>>> I think this is related to a Beam feature called RequiresStableInput
>>> (which my pipeline is using).  It will create a new operator (or keyed)
>>> state per checkpoint.  I'm not sure that there are any parameters that I
>>> have control over to tweak it's behavior (apart from increasing the
>>> checkpoint interval to let the pipeline run longer before building up that
>>> many states).
>>>
>>> Perhaps this is something that can be fixed (maybe by unregistering
>>> Operator States after they aren't used any more in the RequiresStableInput
>>> code).  It seems to me that this isn't a Flink issue, but rather a Beam
>>> issue.
>>>
>>> Thanks for pointing me in the right direction.
>>>
>>> On Thu, Apr 16, 2020 at 11:29 AM Yun Tang  wrote:
>>>
 Hi Stephen

 I think the state name [1] which would be changed every time might the
 root cause. I am not familiar with Beam code, would it be possible to
 create so many operator states? Did you configure some parameters wrongly?


 [1]
 https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L95

 Best
 Yun Tang
 --
 *From:* Stephen Patel 
 *Sent:* Thursday, April 16, 2020 22:30
 *To:* Yun Tang 
 *Cc:* user@flink.apache.org 
 *Subject:* Re: Streaming Job eventually begins failing during
 checkpointing

 Correction.  I've actually found a place where it potentially might be
 creating a new operator state per checkpoint:

 https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L105
 https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L141-L149

 This gives me something I can investigate locally at least.

 On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel 
 wrote:

 I can't say that I ever call that directly.  The beam library that I'm
 using does call it in a couple places:
 https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429

 But it seems to be the same descriptor every time.  Is that limit per
 operator?  That is, can each operator host up to 32767 operator/broadcast
 states?  I assume that's by name?

 On Wed, Apr 15, 2020 at 10:46 PM Yun Tang  wrote:

 Hi  Stephen

 This is not related with RocksDB but with default on-heap operator
 state backend. From your exception stack trace, you have created too many
 operator states (more than 32767).
 How do you call context.getOperatorStateStore().getListState or
 context.getOperatorStateStore().getBroadcastState ? Did you pass a
 different operator state descriptor each time?

 Best
 Yun Tang
 --
 *From:* Stephen Patel 
 *Sent:* Thursday, April 16, 2020 2:09
 *To:* user@flink.apache.org 
 *Subject:* Streaming Job eventually begins failing during checkpointing

 I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's
 configured to use rocksdb, and checkpoint once a minute to hdfs.  This job
 operates just fine for around 20 days, and then begins failing with this
 exception (it fails, restarts, and fails again, repeatedly):

 2020-04-15 13:15:02,920 INFO
  org.apache.flink.runtime.checkpoint.Checkp

Flink Lookup Filter Pushdown

2020-04-27 Thread forideal
Hello, my friend.

I have a dimension table.

createtabledim_u_score(u_idbigint,varchar,score_adouble,score_bdouble)with{xxx}Inascene

The condition of lookup is fliter score_a > 0.9

In another scenario

The condition of lookup is fliter score_b > 1

In Flink, at present, lookup join can use on to pass key values, such as

selectscore_a...leftjoin...source_table.u_id=dim_u_score.u_id

If so, what should I do?

If not, can I say that I can create multiple tables with conditions to use when 
it comes?

such as

createtabledim_u_score_filter_a(u_idbigint,varchar,score_adouble,score_bdouble)with{"filter_condition"="score_a
 > 0.9 
"}createtabledim_u_score_filter_b(u_idbigint,varchar,score_adouble,score_bdouble)with{"filter_condition"="fliter
 score_b > 1 "}

Then, in the process of lookup, push down to the specific execution engine to 
complete the lookup filter.

Cannot map nested Tuple fields to table columns

2020-04-27 Thread Gyula Fóra
Hi All!

I was trying to flatten a nested tuple into named columns with the
fromDataStream method and I hit some problems with mapping tuple fields to
column names.

It seems like the `f0 as ColumnName` kind of expressions are not parsed
correctly.

It is very easy to reproduce:
tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)), "f0 as name,
f1 as age");

This leads to the following 2 kinds of errors depending on how you write
it:
 - Alias 'name' is not allowed if other fields are referenced by position.
 - Could not parse expression at column 7: `(' expected but `'' found
f0 as 'name', f1 as 'age'

I could not find any test cases that would use this logic so I wonder if I
am doing something wrong here, the docs show that this should be possible:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#tuples-scala-and-java-and-case-classes-scala-only

I was actually trying to extract nested tuple fields this way but I did not
get that far. It also seems to fail for Row data types.

What am I doing wrong?

Gyula


"Fill in" notification messages based on event time watermark

2020-04-27 Thread Manas Kale
Hi,
I have an upstream operator that outputs device state transition messages
with event timestamps. Meaning it only emits output when a transition takes
place.
For example,
state1 @ 1 PM
state2 @ 2 PM
and so on.

*Using a downstream operator, I want to emit notification messages as per
some configured periodicity.* For example, if periodicity = 20 min, in the
above scenario this operator will output :
state1 notification @ 1PM
state1 notification @ 1.20PM
state1 notification @ 1.40PM
 ...

*Now the main issue is that I want this to be driven by the watermark and
not by transition events received from upstream. *Meaning I would like to
see notification events as soon as the watermark crosses their timestamps;
*not* when the next transition event arrives at the operator (which could
be hours later, as above).

My first solution, using a keyedProcessFunction and timers did not work as
expected because the order in which transition events arrived at this
operator was non-deterministic. To elaborate, assume a
setAutoWatermarkInterval of 10 second.
If we get transition events :
state1 @ 1sec
state2 @ 3 sec
state3 @ 5 sec
state1 @ 8 sec
the order in which these events arrived at my keyedProcessFunction was not
fixed. To solve this, these messages need to be sorted on event time, which
led me to my second solution.

My second solution, using a EventTimeTumblingWindow with size =
setAutoWatermarkInterval, also does not work. I sorted accumulated events
in the window and applied notification-generation logic on them in order.
However, I assumed that windows are created even if there are no elements.
Since this is not the case, this solution generates notifications only when
the next state tranisition message arrives, which could be hours later.

Does anyone have any suggestions on how I can implement this?
Thanks!


Re: History Server Not Showing Any Jobs - File Not Found?

2020-04-27 Thread Chesnay Schepler
overview.json is a generated file that is placed in the local directory 
controlled by /historyserver.web.tmpdir/.


Have you configured this option to point to some non-local filesystem? 
(Or if not, is the java.io.tmpdir property pointing somewhere funny?)


On 24/04/2020 18:24, Hailu, Andreas wrote:


I’m having a further look at the code in 
HistoryServerStaticFileServerHandler - is there an assumption about 
where overview.json is supposed to be located?


*// *ah**

*From:*Hailu, Andreas [Engineering]
*Sent:* Wednesday, April 22, 2020 1:32 PM
*To:* 'Chesnay Schepler' ; Hailu, Andreas 
[Engineering] ; user@flink.apache.org

*Subject:* RE: History Server Not Showing Any Jobs - File Not Found?

Hi Chesnay, thanks for responding. We’re using Flink 1.9.1. I enabled 
DEBUG level logging and this is something relevant I see:


2020-04-22 13:25:52,566 [Flink-HistoryServer-ArchiveFetcher-thread-1] 
DEBUG DFSInputStream - Connecting to datanode 10.79.252.101:1019


2020-04-22 13:25:52,567 [Flink-HistoryServer-ArchiveFetcher-thread-1] 
DEBUG SaslDataTransferClient - SASL encryption trust check: 
localHostTrusted = false, remoteHostTrusted = false


2020-04-22 13:25:52,567 [Flink-HistoryServer-ArchiveFetcher-thread-1] 
DEBUG SaslDataTransferClient - SASL client skipping handshake in 
secured configuration with privileged port for addr = /10.79.252.101, 
datanodeId = DatanodeI


nfoWithStorage[10.79.252.101:1019,DS-7f4ec55d-7c5f-4a0e-b817-d9e635480b21,DISK]

*2020-04-22 13:25:52,571 [Flink-HistoryServer-ArchiveFetcher-thread-1] 
DEBUG DFSInputStream - DFSInputStream has been closed already*


*2020-04-22 13:25:52,573 [nioEventLoopGroup-3-6] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/jobs/overview.json from classloader*


2020-04-22 13:25:52,576 [IPC Parameter Sending Thread #0] DEBUG 
Client$Connection$3 - IPC Client (1578587450) connection to 
d279536-002.dc.gs.com/10.59.61.87:8020 from d...@gs.com 
 sending #1391


Aside from that, it looks like a lot of logging around datanodes and 
block location metadata. Did I miss something in my classpath, 
perhaps? If so, do you have a suggestion on what I could try?


*// *ah**

*From:*Chesnay Schepler mailto:ches...@apache.org>>
*Sent:* Wednesday, April 22, 2020 2:16 AM
*To:* Hailu, Andreas [Engineering] >; user@flink.apache.org 


*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

Which Flink version are you using?

Have you checked the history server logs after enabling debug logging?

On 21/04/2020 17:16, Hailu, Andreas [Engineering] wrote:

Hi,

I’m trying to set up the History Server, but none of my
applications are showing up in the Web UI. Looking at the console,
I see that all of the calls to /overview return the following 404
response: {"errors":["File not found."]}.

I’ve set up my configuration as follows:

JobManager Archive directory:

*jobmanager.archive.fs.dir*:
hdfs:///user/p2epda/lake/delp_qa/flink_hs/

-bash-4.1$ hdfs dfs -ls /user/p2epda/lake/delp_qa/flink_hs

Found 44282 items

-rw-r- 3 delp datalake_admin_dev  50569 2020-03-21 23:17
/user/p2epda/lake/delp_qa/flink_hs/000144dba9dc0f235768a46b2f26e936

-rw-r- 3 delp datalake_admin_dev  49578 2020-03-03 08:45
/user/p2epda/lake/delp_qa/flink_hs/000347625d8128ee3fd0b672018e38a5

-rw-r- 3 delp datalake_admin_dev  50842 2020-03-24 15:19
/user/p2epda/lake/delp_qa/flink_hs/0004be6ce01ba9677d1eb619ad0fa757

...

...

History Server will fetch the archived jobs from the same location:

*historyserver.archive.fs.dir*:
hdfs:///user/p2epda/lake/delp_qa/flink_hs/

So I’m able to confirm that there are indeed archived applications
that I should be able to view in the histserver. I’m not able to
find out what file the overview service is looking for from the
repository – any suggestions as to what I could look into next?

Best,

Andreas




Your Personal Data: We may collect and process information about
you that may be subject to data protection laws. For more
information about how we use and disclose your personal data, how
we protect your information, our legal basis to use your
information, your rights and who you can contact, please refer to:
www.gs.com/privacy-notices 




Your Personal Data: We may collect and process information about you 
that may be subject to data protection laws. For more information 
about how we use and disclose your personal data, how we protect your 
information, our legal basis to use your information, your rights and 
who you can contact, please refer to: www.gs.com/privacy-notices 


Re: How to disable the state behind `COUNT` sql?

2020-04-27 Thread Benchao Li
Hi,

There is indeed a state for the aggregation result, however we cannot
disable it, it's by design.
StreamQueryConfig.maxIdleStateRetentionTime can control how long the state
will be kept.
If you can ensure the time gap between two records of the same id larger
than, for example
1 min, then setting retention time to 1min can resolve your issue.
If not, maybe you need to change your dimension table, making it return the
count directly instead
of return the details.

izual  于2020年4月27日周一 下午5:06写道:

> I implements my DimTable by extends `LookupTableSource`[1], which stores
> data like:
>
> id=1 -> (SH, BJ, SD)
>
> id=2 -> (...)
>
> and then extends `TableFunction` to return the value corresponding to the
> lookup keys,and maybe return multi rows, for example, when lookupkeys is
> id=1, then in the `TableFunction.eval`
>
> ```
>
> collect('SH')
>
> collect('BJ')
>
> collect('SD')
>
> ```
>
>
> Now I want to get the region'count by id, which is from the tblEvent.id,
> sql is :
>
>
> SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR
> SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY
> tblEvent.id
>
>
> I expect the result of COUNT is always 3 for id = 1, no matter the id=1
> appears how many times.
>
> but the actual result is : 3, 6, 9, ...
>
>
> I think this is bcz the state mechanism behind COUNT, how to turn this off?
>
> Or what's the correct use for this?
> StreamQueryConfig.maxIdleStateRetentionTime or something?
>
>
> The reason not using state in flink:
> http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups
>
>
>
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


How to disable the state behind `COUNT` sql?

2020-04-27 Thread izual
I implements my DimTable by extends `LookupTableSource`[1], which stores data 
like:

id=1 -> (SH, BJ, SD)

id=2 -> (...)

and then extends `TableFunction` to return the value corresponding to the 
lookup keys,and maybe return multi rows, for example, when lookupkeys is id=1, 
then in the `TableFunction.eval`

```

collect('SH')

collect('BJ')

collect('SD')

```




Now I want to get the region'count by id, which is from the tblEvent.id, sql is 
:




SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM 
AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id




I expect the result of COUNT is always 3 for id = 1, no matter the id=1 appears 
how many times.

but the actual result is : 3, 6, 9, ...




I think this is bcz the state mechanism behind COUNT, how to turn this off?

Or what's the correct use for this? StreamQueryConfig.maxIdleStateRetentionTime 
or something?




The reason not using state in flink: 
http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E

[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups

Re: Fault tolerance in Flink file Sink

2020-04-27 Thread Kostas Kloudas
Hi Eyal and Dawid,

@Eyal I think Dawid explained pretty well what is happening and why in
distributed settings, the underlying FS on which the StreamingFileSink
writes has to be durable and accessible to all parallel instances of
the job. Please let us know if you have any further questions.

Cheers,
Kostas

On Mon, Apr 27, 2020 at 9:52 AM Eyal Pe'er  wrote:
>
> Hi Dawid,
> Thanks for the very detailed answer and the correct assumptions (I am using 
> row format).
>
> I tried not using NFS/S3, but seems like it is the only option I have.
>
> Best regards
>
> Eyal Peer
>
> From: Dawid Wysakowicz 
> Sent: Friday, April 24, 2020 4:20 PM
> To: Eyal Pe'er ; user 
> Subject: Re: Fault tolerance in Flink file Sink
>
>
>
> Hi Eyal,
>
> First of all I would say a local filesystem is not a right choice for what 
> you are trying to achieve. I don't think you can achive a true exactly once 
> policy in this setup. Let me elaborate why.
>
> Let me clarify a bit how the StreamingFileSink works.  The interesting bit is 
> how it behaves on checkpoints. The behavior is controlled by a RollingPolicy. 
> As you have not said what format you use lets assume you use row format 
> first. For a row format the default rolling policy (when to change the file 
> from in-progress to pending) is it will be rolled if the file reaches 128MB, 
> the file is older than 60 sec or it has not been written to for 60 sec. It 
> does not roll on a checkpoint. Moreover StreamingFileSink considers the 
> filesystem as a durable sink that can be accessed after a restore. That 
> implies that it will try to append to this file when restoring from 
> checkpoint/savepoint.
>
> Even if you rolled the files on every checkpoint you still might face the 
> problem that you can have some leftovers because the StreamingFileSink moves 
> the files from pending to complete after the checkpoint is completed. If a 
> failure happens between finishing the checkpoint and moving the files it will 
> not be able to move them after a restore (it would do it if had an access).
>
> Lastly a completed checkpoint will contain offsets of records that were 
> processed successfully end-to-end, that means records that are assumed 
> committed by the StreamingFileSink. This can be records written to an 
> in-progress file with a pointer in a StreamingFileSink checkpointed metadata, 
> records in a "pending" file with an entry in a StreamingFileSink checkpointed 
> metadata that this file has been completed or records in "finished" files.[1]
>
> Therefore as you can see there are multiple scenarios when the 
> StreamingFileSink has to access the files after a restart.
>
> Last last thing, you mentioned "committing to the "bootstrap-server". Bear in 
> mind that Flink does not use offsets committed back to Kafka for guaranteeing 
> consistency. It can write those offsets back but only for 
> monitoring/debugging purposes. Flink stores/restores the processed offsets 
> from its checkpoints.[3]
>
> Let me know if it helped. I tried my best ;) BTW I highly encourage reading 
> the linked sources as they try to describe all that in a more structured way.
>
> I am also cc'ing Kostas who knows more about the StreamingFileSink than I 
> do., so he can maybe correct me somewhere.
>
>  Best,
>
> Dawid
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
>
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html
>
> [3]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>
> On 23/04/2020 12:11, Eyal Pe'er wrote:
>
> Hi all,
> I am using Flink streaming with Kafka consumer connector (FlinkKafkaConsumer) 
> and file Sink (StreamingFileSink) in a cluster mode with exactly once policy.
>
> The file sink writes the files to the local disk.
>
> I’ve noticed that if a job fails and automatic restart is on, the task 
> managers look for the leftovers files from the last failing job (hidden 
> files).
>
> Obviously, since the tasks can be assigned to different task managers, this 
> sums up to more failures over and over again.
>
> The only solution I found so far is to delete the hidden files and resubmit 
> the job.
>
> If I get it right (and please correct me If I wrong), the events in the 
> hidden files were not committed to the bootstrap-server, so there is no data 
> loss.
>
>
>
> Is there a way, forcing Flink to ignore the files that were written already? 
> Or maybe there is a better way to implement the solution (maybe somehow with 
> savepoints)?
>
>
>
> Best regards
>
> Eyal Peer
>
>


Re: [ANNOUNCE] Apache Flink 1.9.3 released

2020-04-27 Thread Zhijiang
Thanks Dian for the release work and thanks everyone involved. 

Best,
Zhijiang


--
From:Till Rohrmann 
Send Time:2020 Apr. 27 (Mon.) 15:13
To:Jingsong Li 
Cc:dev ; Leonard Xu ; Benchao Li 
; Konstantin Knauf ; jincheng 
sun ; Hequn Cheng ; Dian Fu 
; user ; user-zh 
; Apache Announce List 
Subject:Re: [ANNOUNCE] Apache Flink 1.9.3 released

Thanks Dian for being our release manager and thanks to everyone who helped
making this release possible.

Cheers,
Till

On Mon, Apr 27, 2020 at 3:26 AM Jingsong Li  wrote:

> Thanks Dian for managing this release!
>
> Best,
> Jingsong Lee
>
> On Sun, Apr 26, 2020 at 7:17 PM Jark Wu  wrote:
>
>> Thanks Dian for being the release manager and thanks all who make this
>> possible.
>>
>> Best,
>> Jark
>>
>> On Sun, 26 Apr 2020 at 18:06, Leonard Xu  wrote:
>>
>> > Thanks Dian for the release and being the release manager !
>> >
>> > Best,
>> > Leonard Xu
>> >
>> >
>> > 在 2020年4月26日,17:58,Benchao Li  写道:
>> >
>> > Thanks Dian for the effort, and all who make this release possible.
>> Great
>> > work!
>> >
>> > Konstantin Knauf  于2020年4月26日周日 下午5:21写道:
>> >
>> >> Thanks for managing this release!
>> >>
>> >> On Sun, Apr 26, 2020 at 3:58 AM jincheng sun > >
>> >> wrote:
>> >>
>> >>> Thanks for your great job, Dian!
>> >>>
>> >>> Best,
>> >>> Jincheng
>> >>>
>> >>>
>> >>> Hequn Cheng  于2020年4月25日周六 下午8:30写道:
>> >>>
>>  @Dian, thanks a lot for the release and for being the release
>> manager.
>>  Also thanks to everyone who made this release possible!
>> 
>>  Best,
>>  Hequn
>> 
>>  On Sat, Apr 25, 2020 at 7:57 PM Dian Fu  wrote:
>> 
>> > Hi everyone,
>> >
>> > The Apache Flink community is very happy to announce the release of
>> > Apache Flink 1.9.3, which is the third bugfix release for the
>> Apache Flink
>> > 1.9 series.
>> >
>> > Apache Flink(r) is an open-source stream processing framework for
>> > distributed, high-performing, always-available, and accurate data
>> streaming
>> > applications.
>> >
>> > The release is available for download at:
>> > https://flink.apache.org/downloads.html
>> >
>> > Please check out the release blog post for an overview of the
>> > improvements for this bugfix release:
>> > https://flink.apache.org/news/2020/04/24/release-1.9.3.html
>> >
>> > The full release notes are available in Jira:
>> > https://issues.apache.org/jira/projects/FLINK/versions/12346867
>> >
>> > We would like to thank all contributors of the Apache Flink
>> community
>> > who made this release possible!
>> > Also great thanks to @Jincheng for helping finalize this release.
>> >
>> > Regards,
>> > Dian
>> >
>> 
>> >>
>> >> --
>> >> Konstantin Knauf | Head of Product
>> >> +49 160 91394525
>> >>
>> >> Follow us @VervericaData Ververica 
>> >>
>> >> --
>> >> Join Flink Forward  - The Apache Flink
>> >> Conference
>> >> Stream Processing | Event Driven | Real Time
>> >> --
>> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> >> --
>> >> Ververica GmbH
>> >> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> >> (Tony) Cheng
>> >>
>> >
>> >
>> > --
>> >
>> > Benchao Li
>> > School of Electronics Engineering and Computer Science, Peking
>> University
>> > Tel:+86-15650713730
>> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
>> >
>> >
>> >
>>
>
>
> --
> Best, Jingsong Lee
>



RE: Fault tolerance in Flink file Sink

2020-04-27 Thread Eyal Pe'er
Hi Dawid,
Thanks for the very detailed answer and the correct assumptions (I am using row 
format).
I tried not using NFS/S3, but seems like it is the only option I have.
Best regards
Eyal Peer
From: Dawid Wysakowicz 
Sent: Friday, April 24, 2020 4:20 PM
To: Eyal Pe'er ; user 
Subject: Re: Fault tolerance in Flink file Sink


Hi Eyal,

First of all I would say a local filesystem is not a right choice for what you 
are trying to achieve. I don't think you can achive a true exactly once policy 
in this setup. Let me elaborate why.

Let me clarify a bit how the StreamingFileSink works.  The interesting bit is 
how it behaves on checkpoints. The behavior is controlled by a RollingPolicy. 
As you have not said what format you use lets assume you use row format first. 
For a row format the default rolling policy (when to change the file from 
in-progress to pending) is it will be rolled if the file reaches 128MB, the 
file is older than 60 sec or it has not been written to for 60 sec. It does not 
roll on a checkpoint. Moreover StreamingFileSink considers the filesystem as a 
durable sink that can be accessed after a restore. That implies that it will 
try to append to this file when restoring from checkpoint/savepoint.

Even if you rolled the files on every checkpoint you still might face the 
problem that you can have some leftovers because the StreamingFileSink moves 
the files from pending to complete after the checkpoint is completed. If a 
failure happens between finishing the checkpoint and moving the files it will 
not be able to move them after a restore (it would do it if had an access).

Lastly a completed checkpoint will contain offsets of records that were 
processed successfully end-to-end, that means records that are assumed 
committed by the StreamingFileSink. This can be records written to an 
in-progress file with a pointer in a StreamingFileSink checkpointed metadata, 
records in a "pending" file with an entry in a StreamingFileSink checkpointed 
metadata that this file has been completed or records in "finished" files.[1]

Therefore as you can see there are multiple scenarios when the 
StreamingFileSink has to access the files after a restart.

Last last thing, you mentioned "committing to the "bootstrap-server". Bear in 
mind that Flink does not use offsets committed back to Kafka for guaranteeing 
consistency. It can write those offsets back but only for monitoring/debugging 
purposes. Flink stores/restores the processed offsets from its checkpoints.[3]

Let me know if it helped. I tried my best ;) BTW I highly encourage reading the 
linked sources as they try to describe all that in a more structured way.

I am also cc'ing Kostas who knows more about the StreamingFileSink than I do., 
so he can maybe correct me somewhere.

 Best,

Dawid

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

[3]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
On 23/04/2020 12:11, Eyal Pe'er wrote:
Hi all,
I am using Flink streaming with Kafka consumer connector (FlinkKafkaConsumer) 
and file Sink (StreamingFileSink) in a cluster mode with exactly once policy.
The file sink writes the files to the local disk.
I've noticed that if a job fails and automatic restart is on, the task managers 
look for the leftovers files from the last failing job (hidden files).
Obviously, since the tasks can be assigned to different task managers, this 
sums up to more failures over and over again.
The only solution I found so far is to delete the hidden files and resubmit the 
job.
If I get it right (and please correct me If I wrong), the events in the hidden 
files were not committed to the bootstrap-server, so there is no data loss.

Is there a way, forcing Flink to ignore the files that were written already? Or 
maybe there is a better way to implement the solution (maybe somehow with 
savepoints)?

Best regards
Eyal Peer



Re: Task Assignment

2020-04-27 Thread Marta Paes Moreira
Sorry — I didn't understand you were dealing with multiple keys.

In that case, I'd recommend you read about key-group assignment [1] and
check the KeyGroupRangeAssignment class [2].

Key-groups are assigned to parallel tasks as ranges before the job is
started — this is also a well-defined behaviour in Flink, with implications
in state reassignment on rescaling. I'm afraid that if you try to hardwire
this behaviour into your code, the job might not be transparently
rescalable anymore.

[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html

[2]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java


On Fri, Apr 24, 2020 at 7:10 AM Navneeth Krishnan 
wrote:

> Hi Marta,
>
> Thanks for you response. What I'm looking for is something like data
> localization. If I have one TM which is processing a set of keys, I want to
> ensure all keys of the same type goes to the same TM rather than using
> hashing to find the downstream slot. I could use a common key to do this
> but I would have to parallelize as much as possible since the number of
> incoming messages is too large to narrow down to a single key and
> processing it.
>
> Thanks
>
> On Thu, Apr 23, 2020 at 2:02 AM Marta Paes Moreira 
> wrote:
>
>> Hi, Navneeth.
>>
>> If you *key* your stream using stream.keyBy(…), this will logically
>> split your input and all the records with the same key will be processed in
>> the same operator instance. This is the default behavior in Flink for keyed
>> streams and transparently handled.
>>
>> You can read more about it in the documentation [1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state-and-operator-state
>>
>> On Thu, Apr 23, 2020 at 7:44 AM Navneeth Krishnan <
>> reachnavnee...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> Is there a way for an upstream operator to know how the downstream
>>> operator tasks are assigned? Basically I want to group my messages to be
>>> processed on slots in the same node based on some key.
>>>
>>> Thanks
>>>
>>


Re: [ANNOUNCE] Apache Flink 1.9.3 released

2020-04-27 Thread Till Rohrmann
Thanks Dian for being our release manager and thanks to everyone who helped
making this release possible.

Cheers,
Till

On Mon, Apr 27, 2020 at 3:26 AM Jingsong Li  wrote:

> Thanks Dian for managing this release!
>
> Best,
> Jingsong Lee
>
> On Sun, Apr 26, 2020 at 7:17 PM Jark Wu  wrote:
>
>> Thanks Dian for being the release manager and thanks all who make this
>> possible.
>>
>> Best,
>> Jark
>>
>> On Sun, 26 Apr 2020 at 18:06, Leonard Xu  wrote:
>>
>> > Thanks Dian for the release and being the release manager !
>> >
>> > Best,
>> > Leonard Xu
>> >
>> >
>> > 在 2020年4月26日,17:58,Benchao Li  写道:
>> >
>> > Thanks Dian for the effort, and all who make this release possible.
>> Great
>> > work!
>> >
>> > Konstantin Knauf  于2020年4月26日周日 下午5:21写道:
>> >
>> >> Thanks for managing this release!
>> >>
>> >> On Sun, Apr 26, 2020 at 3:58 AM jincheng sun > >
>> >> wrote:
>> >>
>> >>> Thanks for your great job, Dian!
>> >>>
>> >>> Best,
>> >>> Jincheng
>> >>>
>> >>>
>> >>> Hequn Cheng  于2020年4月25日周六 下午8:30写道:
>> >>>
>>  @Dian, thanks a lot for the release and for being the release
>> manager.
>>  Also thanks to everyone who made this release possible!
>> 
>>  Best,
>>  Hequn
>> 
>>  On Sat, Apr 25, 2020 at 7:57 PM Dian Fu  wrote:
>> 
>> > Hi everyone,
>> >
>> > The Apache Flink community is very happy to announce the release of
>> > Apache Flink 1.9.3, which is the third bugfix release for the
>> Apache Flink
>> > 1.9 series.
>> >
>> > Apache Flink® is an open-source stream processing framework for
>> > distributed, high-performing, always-available, and accurate data
>> streaming
>> > applications.
>> >
>> > The release is available for download at:
>> > https://flink.apache.org/downloads.html
>> >
>> > Please check out the release blog post for an overview of the
>> > improvements for this bugfix release:
>> > https://flink.apache.org/news/2020/04/24/release-1.9.3.html
>> >
>> > The full release notes are available in Jira:
>> > https://issues.apache.org/jira/projects/FLINK/versions/12346867
>> >
>> > We would like to thank all contributors of the Apache Flink
>> community
>> > who made this release possible!
>> > Also great thanks to @Jincheng for helping finalize this release.
>> >
>> > Regards,
>> > Dian
>> >
>> 
>> >>
>> >> --
>> >> Konstantin Knauf | Head of Product
>> >> +49 160 91394525
>> >>
>> >> Follow us @VervericaData Ververica 
>> >>
>> >> --
>> >> Join Flink Forward  - The Apache Flink
>> >> Conference
>> >> Stream Processing | Event Driven | Real Time
>> >> --
>> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> >> --
>> >> Ververica GmbH
>> >> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> >> (Tony) Cheng
>> >>
>> >
>> >
>> > --
>> >
>> > Benchao Li
>> > School of Electronics Engineering and Computer Science, Peking
>> University
>> > Tel:+86-15650713730
>> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
>> >
>> >
>> >
>>
>
>
> --
> Best, Jingsong Lee
>