Re: Migrate custom partitioner from Flink 1.7 to Flink 1.9

2020-01-03 Thread Salva Alcántara
Thanks Chesnay! Just to be clear, this how my current code looks like:

```
unionChannel = broadcastChannel.broadcast().union(singleChannel)

result = new DataStream<>(
unionChannel.getExecutionEnvironment(),
new PartitionTransformation<>(unionChannel.getTransformation(), new
MyDynamicPartitioner())   
)
```

The problem when migrating to Flink 1.9 is that MyDynamicPartitioner cannot
handle broadcasted elements as explained in the question description. So,
based on your reply, I guess I could do something like this:

```
resultSingleChannel = new DataStream<>(
singleChannel.getExecutionEnvironment(),
new PartitionTransformation<>(singleChannel.getTransformation(), new
MyDynamicPartitioner())   
)

result = broadcastChannel.broadcast().union(resultSingleChannel)
```

I will give it a try and see if it works.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Controlling the Materialization of JOIN updates

2020-01-03 Thread Kurt Young
Hi Benoît,

Before discussing all the options you listed, I'd like understand more
about your requirements.

The part I don't fully understand is, both your fact (Event) and dimension
(DimensionAtJoinTimeX) tables are
coming from the same table, Event or EventRawInput in your case. So it will
result that both your fact and
dimension tables are changing with time.

My understanding is, when your DimensionAtJoinTimeX table emit the results,
you don't want to change the
result again. You want the fact table only join whatever data currently the
dimension table have? I'm asking
because your dimension table was calculated with a window aggregation, but
your join logic seems doesn't
care about the time attribute (LEFT JOIN DimensionAtJoinTime1 d1 ON e.uid =
d1.uid). It's possible that
when a record with uid=x comes from Event table, but the dimension table
doesn't have any data around
uid=x yet due to the window aggregation. In this case, you don't want them
to join?

Best,
Kurt


On Fri, Jan 3, 2020 at 1:11 AM Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Hello all!
>
> I'm trying to design a stream pipeline, and have trouble controlling when
> a JOIN is triggering an update:
>
> Setup:
>
>- The Event table; "probe side", "query side", the result of earlier
>stream processing
>- The DimensionAtJoinTimeX tables; of updating nature, "build side",
>the results of earlier stream processing
>
> Joining them:
>
> SELECT*
> FROM  Event e
> LEFT JOIN DimensionAtJoinTime1 d1
>   ON  e.uid = d1.uid
> LEFT JOIN DimensionAtJoinTime2 d2
>   ON  e.uid = d2.uid
>
> The DimensionAtJoinTimeX Tables being the result of earlier stream
> processing, possibly from the same Event table:
>
> SELECT   uid,
>  hop_start(...),
>  sum(...)
> FROM Event e
> GROUP BY uid,
>  hop(...)
>
> The Event Table being:
>
> SELECT ...
> FROM   EventRawInput i
> WHERE  i.some_field = 'some_value'
>
> Requirements:
>
>- I need the JOINs to only be executed once, only when a new line is
>appended to the probe / query / Event table.
>- I also need the full pipeline to be defined in SQL.
>- I very strongly prefer the Blink planner (mainly for Deduplication,
>TopN and LAST_VALUE features).
>
> Problem exploration so far:
>
>- Option 1, "FOR SYSTEM_TIME AS OF" [1]: I need to have the solution
>in SQL: it doesn't work out. But I might explore the following: insert
>DimensionAtJoinTimeX into a special Sink, and use it in a
>LookupableTableSource (I'm at a loss on how to do that, though. Do I need
>an external kv store?).
>- Option 2, "FOR SYSTEM_TIME AS OF" [1], used in SQL: Is there a
>version of "FOR SYSTEM_TIME AS OF" readily usable in SQL? I might have
>missed something in the documentation.
>- Option 3, "LATERAL TABLE table_function" [2], on the Legacy planner:
>It does not work with two tables [3], and I don't get to have the Blink
>planner features.
>- Option 4, "LATERAL TABLE table_function" [2], on the Blink planner:
>It does not work with the "probe side" being the results of earlier stream
>processing [4].
>- Option 5, let a regular JOIN materialize the updates, and somehow
>find how to filter the ones coming from the build sides (I'm at a loss on
>how to do that).
>- Option 6, "TVR": I read this paper [5], which mentions "Time-Varying
>Relation"s; Speculating here: could there be a way, to say that the build
>side is not a TVR. Aka declare the stream as being somehow "static", while
>still being updated (but I guess we're back to "FOR SYSTEM_TIME AS OF").
>- Option 7: Is there some features being developed, or hints, or
>workarounds to control the JOIN updates that I have not considered so far?
>- Remark 1: I believe that FLINK-15112 and FLINK-14200 are of the same
>bug nature, even though they occur in different situations on different
>planners (same Exception Stack Trace on files that have the same historical
>parent before the Blink fork). FLINK-15112 has a workaround, but
>FLINK-14200 does not. The existence of that workaround IMHO signals that
>there is a simple fix for both bugs. I have tried to find it in Flink for a
>few days, but no success so far. If you guys have pointers helping me
>provide a fix, I'll gladly listen. So far I have progressed to: It revolves
>around Calcite-based Flink streaming rules transforming a temporal table
>function correlate into a Join on 2*Scan, and crashes when it encounters
>something that is not a table that can be readily scanned. Also, there are
>shenanigans on trying to find the right schema in the Catalog. But I am
>blocked now, and not accustomed to the Flink internal code (would like to
>though, if Alibaba/Ververica are recruiting remote workers, wink wink,
>nudge nudge).
>
> All opinions very much welcomed on all Options and Remarks!
>
> Cheers, and a hap

Re: Flink group with time-windowed join

2020-01-03 Thread Kurt Young
Looks like a bug to me, could you fire an issue for this?

Best,
Kurt


On Thu, Jan 2, 2020 at 9:06 PM jeremyji <18868129...@163.com> wrote:

> Two stream as table1, table2. We know that group with regular join won't
> work
> so we have to use time-windowed join. So here is my flink sql looks like:
>
> *SELECT
> a.account account,
> SUM(a.value) + SUM(b.value),
> UNIX_TIMESTAMP(TUMBLE_START(a.producer_timestamp, INTERVAL '3'
> MINUTE))
> FROM
> (SELECT
> account,
> value,
> producer_timestamp
> FROM
> table1) a,
> (SELECT
> account,
> value,
> producer_timestamp
> FROM
> table2) b
> WHERE
> a.account = b.account AND
> a.producer_timestamp BETWEEN b.producer_timestamp - INTERVAL '3'
> MINUTE AND b.producer_timestamp)
> group by
> a.account,
> TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)*
> But i still got error from flink:
>
> /Rowtime attributes must not be in the input rows of a regular join. As a
> workaround you can cast the time attributes of input tables to TIMESTAMP
> before.
> Please check the documentation for the set of currently supported SQL
> features.
> at
>
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:450)
> at
>
> org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:369)
> at
>
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:814)
> at
>
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
> at
>
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
> at
>
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1048)
> at
>
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:962)
> at
>
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:922)
> /
> I think i use time-windowed join just like this doc
> says:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#time-windowed-joins
> .
> But flink told me its a regular join. Is there anything wrong i haven't
> notice?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Duplicate tasks for the same query

2020-01-03 Thread Kurt Young
Hi RKandoji,

It looks like you have a data skew issue with your input data. Some or
maybe only one "userId" appears more frequent than others. For join
operator to work correctly, Flink will apply "shuffle by join key" before
the
operator, so same "userId" will go to the same sub-task to perform join
operation. In this case, I'm afraid there is nothing much you can do for
now.

BTW, for the DeDuplicate, do you keep the latest record or the earliest? If
you keep the latest version, Flink will tigger retraction and then send the
latest
record again every time when your user table changes.

Best,
Kurt


On Sat, Jan 4, 2020 at 5:09 AM RKandoji  wrote:

> Hi,
>
> Thanks a ton for the help with earlier questions, I updated code to
> version 1.9 and started using Blink Planner (DeDuplication). This is
> working as expected!
>
> I have a new question, but thought of asking in the same email chain as
> this has more context about my use case etc.
>
> Workflow:
> Currently I'm reading from a couple of Kafka topics, DeDuplicating the
> input data, performing JOINs and writing the joined data to another Kafka
> topic.
>
> Issue:
> I set Parallelism to 8 and on analyzing the subtasks found that the data
> is not distributed well among 8 parallel tasks for the last Join query. One
> of a subtask is taking huge load, whereas others taking pretty low load.
>
> Tried a couple of things below, but no use. Not sure if they are actually
> related to the problem as I couldn't yet understand what's the issue here.
> 1. increasing the number of partitions of output Kafka topic.
> 2. tried adding keys to output so key partitioning happens at Kafka end.
>
> Below is a snapshot for reference:
> [image: image.png]
>
> Below are the config changes I made:
>
> taskmanager.numberOfTaskSlots: 8
> parallelism.default: 8
> jobmanager.heap.size: 5000m
> taskmanager.heap.size: 5000m
> state.backend: rocksdb
> state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
> state.backend.incremental: true
>
> I don't see any errors and job seems to be running smoothly (and slowly).
> I need to make it distribute the load well for faster processing, any
> pointers on what could be wrong and how to fix it would be very helpful.
>
> Thanks,
> RKandoji
>
>
> On Fri, Jan 3, 2020 at 1:06 PM RKandoji  wrote:
>
>> Thanks!
>>
>> On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li 
>> wrote:
>>
>>> Yes,
>>>
>>> 1.9.2 or Coming soon 1.10
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:
>>>
 Ok thanks, does it mean version 1.9.2 is what I need to use?

 On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
 wrote:

> Blink planner was introduced in 1.9. We recommend use blink planner
> after 1.9.
> After some bug fix, I think the latest version of 1.9 is OK. The
> production environment has also been set up in some places.
>
> Best,
> Jingsong Lee
>
> On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:
>
>> Thanks Jingsong and Kurt for more details.
>>
>> Yes, I'm planning to try out DeDuplication when I'm done upgrading to
>> version 1.9. Hopefully deduplication is done by only one task and reused
>> everywhere else.
>>
>> One more follow-up question, I see "For production use cases, we
>> recommend the old planner that was present before Flink 1.9 for now." 
>> warning
>> here
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
>> This is actually the reason why started with version 1.8, could you
>> please let me know your opinion about this? and do you think there is any
>> production code running on version 1.9
>>
>> Thanks,
>> Reva
>>
>>
>>
>>
>> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:
>>
>>> BTW, you could also have a more efficient version of deduplicating
>>> user table by using the topn feature [1].
>>>
>>> Best,
>>> Kurt
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>>>
>>>
>>> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li 
>>> wrote:
>>>
 Hi RKandoji,

 In theory, you don't need to do something.
 First, the optimizer will optimize by doing duplicate nodes.
 Second, after SQL optimization, if the optimized plan still has
 duplicate nodes, the planner will automatically reuse them.
 There are config options to control whether we should reuse plan,
 their default value is true. So you don't need modify them.
 - table.optimizer.reuse-sub-plan-enabled
 - table.optimizer.reuse-source-enabled

 Best,
 Jingsong Lee

 On Tue, Dec 31, 2019 at 6:29 AM RKandoji 
 wrote:

> Thanks Terry and Jingsong,
>
> Currently I'm on 1.8 version using Flink planner for stream
> proessing, I'll swit

Flink logging issue with logback

2020-01-03 Thread Bajaj, Abhinav
Hi,

I am investigating a logging issue with Flink.

Setup

  *   Using Flink-1.7.1 using logback as suggested in Flink documentation 
here.
  *   Submitting the Flink job from the Flink dashboard.

Observations

  *   Logs from main method(outside of job graph) do not show up in jobmanager 
logs.
  *   Logs from the operators like map or custom operators do show up in the 
taskmanager logs.
  *   Logs from main method do show up in jobmanager logs when using log4j in 
place of logback.

Has anyone else noticed similar behavior or is this a known issue with logback 
integration in Flink?
Any suggestions on potential workaround or fix?

Appreciate your time and help.

~ Abhinav Bajaj



Table API: Joining on Tables of Complex Types

2020-01-03 Thread Hailu, Andreas
Hi folks,

I'm trying to join two Tables which are composed of complex types, Avro's 
GenericRecord to be exact. I have to use a custom UDF to extract fields out of 
the record and I'm having some trouble on how to do joins on them as I need to 
call this UDF to read what I need. Example below:

batchTableEnvironment.registerFunction("getField", new GRFieldExtractor()); // 
GenericRecord field extractor
Table users = batchTableEnvironment.fromDataSet(usersDataset); // Converting 
from some pre-existing DataSet
Table otherDataset = batchTableEnvironment.fromDataSet(someOtherDataset);
Table userNames = t.select("getField(f0, userName)"); // This is how the UDF is 
used, as GenericRecord is a complex type requiring you to invoke a get() method 
on the field you're interested in. Here we get a get on field 'userName'

I'd like to do something using the Table API similar to the query "SELECT * 
from otherDataset WHERE otherDataset.userName = users.userName". How is this 
done?

Best,
Andreas

The Goldman Sachs Group, Inc. All rights reserved.
See http://www.gs.com/disclaimer/global_email for important risk disclosures, 
conflicts of interest and other terms and conditions relating to this e-mail 
and your reliance on information contained in it.  This message may contain 
confidential or privileged information.  If you are not the intended recipient, 
please advise us immediately and delete this message.  See 
http://www.gs.com/disclaimer/email for further information on confidentiality 
and the risks of non-secure electronic communication.  If you cannot access 
these links, please notify us by reply message and we will send the contents to 
you.




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re: Duplicate tasks for the same query

2020-01-03 Thread RKandoji
Hi,

Thanks a ton for the help with earlier questions, I updated code to version
1.9 and started using Blink Planner (DeDuplication). This is working as
expected!

I have a new question, but thought of asking in the same email chain as
this has more context about my use case etc.

Workflow:
Currently I'm reading from a couple of Kafka topics, DeDuplicating the
input data, performing JOINs and writing the joined data to another Kafka
topic.

Issue:
I set Parallelism to 8 and on analyzing the subtasks found that the data is
not distributed well among 8 parallel tasks for the last Join query. One of
a subtask is taking huge load, whereas others taking pretty low load.

Tried a couple of things below, but no use. Not sure if they are actually
related to the problem as I couldn't yet understand what's the issue here.
1. increasing the number of partitions of output Kafka topic.
2. tried adding keys to output so key partitioning happens at Kafka end.

Below is a snapshot for reference:
[image: image.png]

Below are the config changes I made:

taskmanager.numberOfTaskSlots: 8
parallelism.default: 8
jobmanager.heap.size: 5000m
taskmanager.heap.size: 5000m
state.backend: rocksdb
state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
state.backend.incremental: true

I don't see any errors and job seems to be running smoothly (and slowly). I
need to make it distribute the load well for faster processing, any
pointers on what could be wrong and how to fix it would be very helpful.

Thanks,
RKandoji


On Fri, Jan 3, 2020 at 1:06 PM RKandoji  wrote:

> Thanks!
>
> On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li  wrote:
>
>> Yes,
>>
>> 1.9.2 or Coming soon 1.10
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:
>>
>>> Ok thanks, does it mean version 1.9.2 is what I need to use?
>>>
>>> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
>>> wrote:
>>>
 Blink planner was introduced in 1.9. We recommend use blink planner
 after 1.9.
 After some bug fix, I think the latest version of 1.9 is OK. The
 production environment has also been set up in some places.

 Best,
 Jingsong Lee

 On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:

> Thanks Jingsong and Kurt for more details.
>
> Yes, I'm planning to try out DeDuplication when I'm done upgrading to
> version 1.9. Hopefully deduplication is done by only one task and reused
> everywhere else.
>
> One more follow-up question, I see "For production use cases, we
> recommend the old planner that was present before Flink 1.9 for now." 
> warning
> here https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
>
> This is actually the reason why started with version 1.8, could you
> please let me know your opinion about this? and do you think there is any
> production code running on version 1.9
>
> Thanks,
> Reva
>
>
>
>
> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:
>
>> BTW, you could also have a more efficient version of deduplicating
>> user table by using the topn feature [1].
>>
>> Best,
>> Kurt
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>>
>>
>> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li 
>> wrote:
>>
>>> Hi RKandoji,
>>>
>>> In theory, you don't need to do something.
>>> First, the optimizer will optimize by doing duplicate nodes.
>>> Second, after SQL optimization, if the optimized plan still has
>>> duplicate nodes, the planner will automatically reuse them.
>>> There are config options to control whether we should reuse plan,
>>> their default value is true. So you don't need modify them.
>>> - table.optimizer.reuse-sub-plan-enabled
>>> - table.optimizer.reuse-source-enabled
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Tue, Dec 31, 2019 at 6:29 AM RKandoji  wrote:
>>>
 Thanks Terry and Jingsong,

 Currently I'm on 1.8 version using Flink planner for stream
 proessing, I'll switch to 1.9 version to try out blink planner.

 Could you please point me to any examples (Java preferred) using
 SubplanReuser?

 Thanks,
 RK

 On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li <
 jingsongl...@gmail.com> wrote:

> Hi RKandoji,
>
> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>
>Join  Join
>  /  \  /  \
>  Filter1  Filter2  Filter1  Filter2
> ||=>   \ /
>  Project1 Project2Project1
> ||   |
>   Scan1Scan2   Scan1
>
>
> [1]
> https://github.com/apache/flink/blob/m

Re: Duplicate tasks for the same query

2020-01-03 Thread RKandoji
Thanks!

On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li  wrote:

> Yes,
>
> 1.9.2 or Coming soon 1.10
>
> Best,
> Jingsong Lee
>
> On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:
>
>> Ok thanks, does it mean version 1.9.2 is what I need to use?
>>
>> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
>> wrote:
>>
>>> Blink planner was introduced in 1.9. We recommend use blink planner
>>> after 1.9.
>>> After some bug fix, I think the latest version of 1.9 is OK. The
>>> production environment has also been set up in some places.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:
>>>
 Thanks Jingsong and Kurt for more details.

 Yes, I'm planning to try out DeDuplication when I'm done upgrading to
 version 1.9. Hopefully deduplication is done by only one task and reused
 everywhere else.

 One more follow-up question, I see "For production use cases, we
 recommend the old planner that was present before Flink 1.9 for now." 
 warning
 here https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
 This is actually the reason why started with version 1.8, could you
 please let me know your opinion about this? and do you think there is any
 production code running on version 1.9

 Thanks,
 Reva




 On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:

> BTW, you could also have a more efficient version of deduplicating
> user table by using the topn feature [1].
>
> Best,
> Kurt
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>
>
> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li 
> wrote:
>
>> Hi RKandoji,
>>
>> In theory, you don't need to do something.
>> First, the optimizer will optimize by doing duplicate nodes.
>> Second, after SQL optimization, if the optimized plan still has
>> duplicate nodes, the planner will automatically reuse them.
>> There are config options to control whether we should reuse plan,
>> their default value is true. So you don't need modify them.
>> - table.optimizer.reuse-sub-plan-enabled
>> - table.optimizer.reuse-source-enabled
>>
>> Best,
>> Jingsong Lee
>>
>> On Tue, Dec 31, 2019 at 6:29 AM RKandoji  wrote:
>>
>>> Thanks Terry and Jingsong,
>>>
>>> Currently I'm on 1.8 version using Flink planner for stream
>>> proessing, I'll switch to 1.9 version to try out blink planner.
>>>
>>> Could you please point me to any examples (Java preferred) using
>>> SubplanReuser?
>>>
>>> Thanks,
>>> RK
>>>
>>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li 
>>> wrote:
>>>
 Hi RKandoji,

 FYI: Blink-planner subplan reusing: [1] 1.9 available.

Join  Join
  /  \  /  \
  Filter1  Filter2  Filter1  Filter2
 ||=>   \ /
  Project1 Project2Project1
 ||   |
   Scan1Scan2   Scan1


 [1]
 https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala

 Best,
 Jingsong Lee

 On Mon, Dec 30, 2019 at 12:28 PM Terry Wang 
 wrote:

> Hi RKandoji~
>
> Could you provide more info about your poc environment?
> Stream or batch? Flink planner or blink planner?
> AFAIK, blink planner has done some optimization to deal such
> duplicate task for one same query. You can have a try with blink 
> planner :
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>
> Best,
> Terry Wang
>
>
>
> 2019年12月30日 03:07,RKandoji  写道:
>
> Hi Team,
>
> I'm doing a POC with flink to understand if it's a good fit for my
> use case.
>
> As part of the process, I need to filter duplicate items and
> created below query to get only the latest records based on 
> timestamp. For
> instance, I have "Users" table which may contain multiple messages 
> for the
> same "userId". So I wrote below query to get only the latest message 
> for a
> given "userId"
>
> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE
> (userId, userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM 
> Users
> GROUP BY userId)");
>
> The above query works as expected and contains only the latest
> users based on timestamp.
>
> The issue is when I use "uniqueUs

Re: Flink task node shut it self off.

2020-01-03 Thread John Smith
Well there was this huge IO wait like over 140% spike. IO wait rose slowly
for couple hours then at some time it spiked at 140% and then after IO wait
dropped back to "normal" the CPU 1min 5min 15min spiked to like 3 times the
number of cores for a bit.

We where at "peek" operation. I.e we where running a batch job when this
hapenned. On average operation the "business" requests per second from our
services is about 15 RPS when we do batches we can hit 600 RPS for a few
hours and then back down. Each business request underneath does a few round
trips back and forth between Kafka, cache systems Flink, DBs etc... So
Flink jobs are a subset of some parts of that 600 RPS.

On Flink side we 3 task managers of 4 cores 8GB which are configured as 8
slots, 5.4GB JVM, 3.77GB flink managed mem per task manager. We have 8 jobs
and 9 slots free. So the cluster isn't full yet. But we do see one node is
full.

We use disk FS state (backed by GlusterFS) not rocks DB. We had enabled 5
second checkpointing for 6 of the jobs... So just wondering if that was
possibly the reason for the IO wait... But regardless of the RPS mentioned
above the jobs will always checkpoint every 5 seconds... I had the chance
to increase checkpointing for a few of the jobs before the holidays. I am
back on Monday...

On Fri., Jan. 3, 2020, 11:16 a.m. Chesnay Schepler, 
wrote:

> The logs show 2 interesting pieces of information:
>
> 
> ...
> 2019-12-19 18:33:23,278 INFO
> org.apache.kafka.clients.FetchSessionHandler  - [Consumer
> clientId=consumer-4, groupId=ccdb-prod-import] Error sending fetch
> request (sessionId=INVALID, epoch=INITIAL) to node 0:
> org.apache.kafka.common.errors.DisconnectException.
> ...
> 2019-12-19 19:37:06,732 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
> resolve ResourceManager address 
> akka.tcp://flink@xx-job-0002:36835/user/resourcemanager,
> retrying in 1 ms: Ask timed out on
> [ActorSelection[Anchor(akka.tcp://flink@xx-job-0002:36835/),
> Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
> of type "akka.actor.Identify"..
>
> This reads like the machine lost network connectivity for some reason. The
> tasks start failing because kafka cannot be reached, and the TM then shuts
> down because it can neither reach the ResourceManager.
>
> On 25/12/2019 04:34, Zhijiang wrote:
>
> If you use rocksDB state backend, it might consume extra native memory.
> Some resource framework cluster like yarn would kill the container if the
> memory usage exceeds some threshold. You can also double check whether it
> exists in your case.
>
> --
> From:John Smith  
> Send Time:2019 Dec. 25 (Wed.) 03:40
> To:Zhijiang  
> Cc:user  
> Subject:Re: Flink task node shut it self off.
>
> The shutdown happened after the massive IO wait. I don't use any state
> Checkpoints are disk based...
>
> On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang, 
> wrote:
> Hi John,
>
> Thanks for the positive comments of Flink usage. No matter at least-once
> or exactly-once you used for checkpoint, it would never lose one message
> during failure recovery.
>
> Unfortunatelly I can not visit the logs you posted. Generally speaking the
> longer internal checkpoint would mean replaying more source data after
> failure recovery.
> In my experience the 5 seconds interval for checkpoint is too frequently
> in my experience, and you might increase it to 1 minute or so. You can also
> monitor how long will the checkpoint finish in your application, then you
> can adjust the interval accordingly.
>
> Concerning of the node shutdown you mentioned, I am not quite sure whether
> it is relevant to your short checkpoint interval. Do you config to use heap
> state backend?  The hs_err file really indicated that you job had
> encountered the memory issue, then it is better to somehow increase your
> task manager memory. But if you can analyze the dump hs_err file via some
> profiler tool for checking the memory usage, it might be more helpful to
> find the root cause.
>
> Best,
> Zhijiang
>
> --
> From:John Smith 
> Send Time:2019 Dec. 21 (Sat.) 05:26
> To:user 
> Subject:Flink task node shut it self off.
>
> Hi, using Flink 1.8.0
>
> 1st off I must say Flink resiliency is very impressive, we lost a node and
> never lost one message by using checkpoints and Kafka. Thanks!
>
> The cluster is a self hosted cluster and we use our own zookeeper cluster.
> We have...
> 3 zookeepers: 4 cpu, 8GB (each)
> 3 job nodes: 4 cpu, 8GB (each)
> 3 task nodes: 4 cpu, 8GB (each)
> The nodes also share GlusterFS for storing savepoints and checkpoints,
> GlusterFS is running on the same machines.
>
> Yesterday a node shut itself off we the following log messages...
> - Stopping TaskExecutor
> akka.tcp://fl...@xxx.xxx.xxx.73:34697/user/taskmanager_0.
> - Stop job leader service.
> - Sto

Re: Checkpoints issue and job failing

2020-01-03 Thread Navneeth Krishnan
Thanks Congxian & Vino.

Yes, the file do exist and I don't see any problem in accessing it.

Regarding flink 1.9, we haven't migrated yet but we are planning to do.
Since we have to test it might take sometime.

Thanks

On Fri, Jan 3, 2020 at 2:14 AM Congxian Qiu  wrote:

> Hi
>
> Do you have ever check that this problem exists on Flink 1.9?
>
> Best,
> Congxian
>
>
> vino yang  于2020年1月3日周五 下午3:54写道:
>
>> Hi Navneeth,
>>
>> Did you check if the path contains in the exception is really can not be
>> found?
>>
>> Best,
>> Vino
>>
>> Navneeth Krishnan  于2020年1月3日周五 上午8:23写道:
>>
>>> Hi All,
>>>
>>> We are running into checkpoint timeout issue more frequently in
>>> production and we also see the below exception. We are running flink 1.4.0
>>> and the checkpoints are saved on NFS. Can someone suggest how to overcome
>>> this?
>>>
>>> [image: image.png]
>>>
>>> java.lang.IllegalStateException: Could not initialize operator state 
>>> backend.
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.io.FileNotFoundException: 
>>> /mnt/checkpoints/02c4f8d5c11921f363b98c5959cc4f06/chk-101/e71d8eaf-ff4a-4783-92bd-77e3d8978e01
>>>  (No such file or directory)
>>> at java.io.FileInputStream.open0(Native Method)
>>> at java.io.FileInputStream.open(FileInputStream.java:195)
>>> at java.io.FileInputStream.(FileInputStream.java:138)
>>> at 
>>> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
>>>
>>>
>>> Thanks
>>>
>>>


Re: Flink task node shut it self off.

2020-01-03 Thread Chesnay Schepler

The logs show 2 interesting pieces of information:


...
2019-12-19 18:33:23,278 INFO 
org.apache.kafka.clients.FetchSessionHandler  - 
[Consumer clientId=consumer-4, groupId=ccdb-prod-import] Error 
sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: 
org.apache.kafka.common.errors.DisconnectException.

...
2019-12-19 19:37:06,732 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor    - Could 
not resolve ResourceManager address 
akka.tcp://flink@xx-job-0002:36835/user/resourcemanager, retrying in 
1 ms: Ask timed out on 
[ActorSelection[Anchor(akka.tcp://flink@xx-job-0002:36835/), 
Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent 
message of type "akka.actor.Identify"..


This reads like the machine lost network connectivity for some reason. 
The tasks start failing because kafka cannot be reached, and the TM then 
shuts down because it can neither reach the ResourceManager.


On 25/12/2019 04:34, Zhijiang wrote:

If you use rocksDB state backend, it might consume extra native memory.
Some resource framework cluster like yarn would kill the container if 
the memory usage exceeds some threshold. You can also double check 
whether it exists in your case.


--
From:John Smith 
Send Time:2019 Dec. 25 (Wed.) 03:40
To:Zhijiang 
Cc:user 
Subject:Re: Flink task node shut it self off.

The shutdown happened after the massive IO wait. I don't use any
state Checkpoints are disk based...

On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang,
mailto:wangzhijiang...@aliyun.com>>
wrote:
Hi John,

Thanks for the positive comments of Flink usage. No matter
at least-once or exactly-once you used for checkpoint, it would
never lose one message during failure recovery.

Unfortunatelly I can not visit the logs you posted. Generally
speaking the longer internal checkpoint would mean replaying more
source data after failure recovery.
In my experience the 5 seconds interval for checkpoint is too
frequently in my experience, and you might increase it to 1 minute
or so. You can also monitor how long will the checkpoint finish in
your application, then you can adjust the interval accordingly.

Concerning of the node shutdown you mentioned, I am not quite sure
whether it is relevant to your short checkpoint interval. Do you
config to use heap state backend? The hs_err file really indicated
that you job had encountered the memory issue, then it is better
to somehow increase your task manager memory. But if you can
analyze the dump hs_err file via some profiler tool for checking
the memory usage, it might be more helpful to find the root cause.

Best,
Zhijiang

--
From:John Smith mailto:java.dev@gmail.com>>
Send Time:2019 Dec. 21 (Sat.) 05:26
To:user mailto:user@flink.apache.org>>
Subject:Flink task node shut it self off.

Hi, using Flink 1.8.0

1st off I must say Flink resiliency is very impressive, we lost a
node and never lost one message by using checkpoints and Kafka.
Thanks!

The cluster is a self hosted cluster and we use our own zookeeper
cluster. We have...
3 zookeepers: 4 cpu, 8GB (each)
3 job nodes: 4 cpu, 8GB (each)
3 task nodes: 4 cpu, 8GB (each)
The nodes also share GlusterFS for storing savepoints and
checkpoints, GlusterFS is running on the same machines.

Yesterday a node shut itself off we the following log messages...
- Stopping TaskExecutor
akka.tcp://fl...@xxx.xxx.xxx.73:34697/user/taskmanager_0.
- Stop job leader service.
- Stopping ZooKeeperLeaderRetrievalService
/leader/resource_manager_lock.
- Shutting down TaskExecutorLocalStateStoresManager.
- Shutting down BLOB cache
- Shutting down BLOB cache
- removed file cache directory
/tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000
- I/O manager removed spill file directory
/tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
- Shutting down the network environment and its components.

Prior to the node shutting off we noticed massive IOWAIT of 140%
and CPU load 1minute of 15. And we also got an hs_err file which
sais we should increase the memory.

I'm attaching the logs here:
https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0

I wonder if my 5 second checkpointing is too much for gluster.

Any thoughts?










Re: Change Akka Ask Timeout for Job Submission Only

2020-01-03 Thread Chesnay Schepler

There are 3 communication layers involved here:

1) client <=> server (REST API)

This goes through REST and does not use timeouts AFAIK. We wait until a 
response comes or the connection terminates.


2) server (REST API) <=> processes (JM, Dispatcher)

This goes through akka, with "web.timeout" being used for the timeout.

3) processes <=> processes

Also akka, with "akka.ask.timeout" being used.


The timeout in question occurs on layer 2) due to the JM being 
incredibly busy, possible due to some heavy-weight computation in the 
job setup.

In any case, you can try increasing web.timeout to maybe resolve this issue.


On 20/12/2019 06:13, tison wrote:

Forward to user list.

Best,
tison.


Abdul Qadeer mailto:quadeer@gmail.com>> 
于2019年12月20日周五 下午12:57写道:


Around submission time, logs from jobmanager:


{"timeMillis":1576764854245,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Received
JobGraph submission 714829e8f6c8cd0daaed335c1b8c588a

(sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M

{"timeMillis":1576764854247,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Submitting
job 714829e8f6c8cd0daaed335c1b8c588a

(sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M

{"timeMillis":1576764856119,"thread":"flink-akka.actor.default-dispatcher-1036","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message
[org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from
Actor[akka://flink/deadLetters] to
Actor[akka://flink/user/jobmanager_4#-2122695705] was not
delivered. [87] dead letters encountered. This logging can be
turned off or adjusted with configuration settings
'akka.log-dead-letters' and

'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1150,"threadPriority":5}^M

{"timeMillis":1576764877732,"thread":"flink-akka.actor.default-dispatcher-1039","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message
[org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from
Actor[akka://flink/deadLetters] to
Actor[akka://flink/user/jobmanager_4#-2122695705] was not
delivered. [88] dead letters encountered. This logging can be
turned off or adjusted with configuration settings
'akka.log-dead-letters' and

'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1155,"threadPriority":5}^M

{"timeMillis":1576764877732,"thread":"flink-scheduler-1","level":"ERROR","loggerName":"org.apache.flink.runtime.rest.handler.job.JobSubmitHandler","message":"Unhandled
exception.","thrown":{"commonElementCount":0,"localizedMessage":"Ask
timed out on [Actor[akka://flink/user/dispatcher#1899316777]]
after [1 ms]. Sender[null] sent message of type

\"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","message":"Ask
timed out on [Actor[akka://flink/user/dispatcher#1899316777]]
after [1 ms]. Sender[null] sent message of type

\"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","name":"akka.pattern.AskTimeoutException","extendedStackTrace":[{"class":"akka.pattern.PromiseActorRef$$anonfun$1","method":"apply$mcV$sp","file":"AskSupport.scala","line":604,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.Scheduler$$anon$4","method":"run","file":"Scheduler.scala","line":126,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"unbatchedExecute","file":"Future.scala","line":601,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.BatchingExecutor$class","method":"execute","file":"BatchingExecutor.scala","line":109,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"execute","file":"Future.scala","line":599,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$TaskHolder","method":"executeTask","file":"LightArrayRevolverScheduler.scala","line":329,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"executeBucket$1","file":"LightArrayRevolverScheduler.scala","line":280,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"nextTick","file":"LightArrayRevolverScheduler.scala","line":284,"exact":true,

Re: kafka: how to stop consumption temporarily

2020-01-03 Thread Chesnay Schepler
Are you asking how to detect from within the job whether the dump is 
complete, or how to combine these 2 jobs?


If you had a way to notice whether the dump is complete, then I would 
suggest to create a custom source that wraps 2 kafka sources, and switch 
between them at will based on your conditions.



On 03/01/2020 03:53, Terry Wang wrote:

Hi,

I’d like to share my opinion here. It seems that you need adjust the Kafka 
consumer to have communication each other. When your begin the dump process, 
you need to notify another CDC-topic consumer to wait idle.


Best,
Terry Wang




2020年1月2日 16:49,David Morin  写道:

Hi,

Is there a way to stop temporarily to consume one kafka source in streaming 
mode ?
Use case: I have to consume 2 topics but in fact one of them is more 
prioritized.
One of this topic is dedicated to ingest data from db (change data capture) and 
one of them is dedicated to make a synchronization (a dump i.e. a SELECT ... 
from db). At the moment the last one is performed by one Flink job and we start 
this one after stop the previous one (CDC) manually
I want to merge these 2 modes and automatically stop consumption of the topic 
dedicated to the CDC mode when a dump is done.
How to handle that with Flink in a streaming way ? backpressure ? ...
Thx in advance for your insights

David






Re: Migrate custom partitioner from Flink 1.7 to Flink 1.9

2020-01-03 Thread Chesnay Schepler
You should be able to implement this on the DataStream API level using 
DataStream#broadcast and #union like this:


input = ...

singleChannel = input.filter(x -> !x.isBroadCastPartitioning);

broadcastChannel = input.filter(x -> x.isBroadCastPartitioning);

result = broadcastChannel.broadcast().union(singleChannel)

// apply operations on result


On 26/12/2019 08:20, Salva Alcántara wrote:

I am trying to migrate a custom dynamic partitioner from Flink 1.7 to Flink
1.9. The original partitioner implemented the `selectChannels` method within
the `StreamPartitioner` interface like this:

```java
 // Original: working for Flink 1.7
 //@Override
 public int[] selectChannels(SerializationDelegate>
streamRecordSerializationDelegate,
 int numberOfOutputChannels) {
 T value =
streamRecordSerializationDelegate.getInstance().getValue();
 if (value.f0.isBroadCastPartitioning()) {
 // send to all channels
 int[] channels = new int[numberOfOutputChannels];
 for (int i = 0; i < numberOfOutputChannels; ++i) {
 channels[i] = i;
 }
 return channels;
 } else if (value.f0.getPartitionKey() == -1) {
 // random partition
 returnChannels[0] = random.nextInt(numberOfOutputChannels);
 } else {
 returnChannels[0] =
partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels);
 }
 return returnChannels;
 }

```

I am not sure how to migrate this to Flink 1.9, since the
`StreamPartitioner` interface has changed as illustrated below:


```java
 // New: required by Flink 1.9
 @Override
 public int selectChannel(SerializationDelegate>
streamRecordSerializationDelegate) {
 T value =
streamRecordSerializationDelegate.getInstance().getValue();
 if (value.f0.isBroadCastPartitioning()) {
 /*
 It is illegal to call this method for broadcast channel
selectors and this method can remain not
 implemented in that case (for example by throwing
UnsupportedOperationException).
 */
 } else if (value.f0.getPartitionKey() == -1) {
 // random partition
 returnChannels[0] = random.nextInt(numberOfChannels);
 } else {
 returnChannels[0] =
partitioner.partition(value.f0.getPartitionKey(), numberOfChannels);
 }
 //return returnChannels;
 return returnChannels[0];
 }
```

Note that `selectChannels` has been replaced with `selectChannel`. So, it is
no longer possible to return multiple output channels as originally done
above for the case of broadcasted elements. As a matter of fact,
`selectChannel` should not be invoked for this particular case. Any thoughts
on how to tackle this?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Yarn Kerberos issue

2020-01-03 Thread Chesnay Schepler
From what I understand from the documentation, if you want to use 
delegation tokens you always first have to issue a ticket using kinit; 
so you did everything correctly?


On 02/01/2020 13:00, Juan Gentile wrote:


Hello,

Im trying to submit a job (batch worcount) to a Yarn cluster. I’m 
trying to use delegation tokens and I’m getting the following error:


/org.apache.flink.client.deployment.ClusterDeploymentException: 
Couldn't deploy Yarn session cluster/


/at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)/


/at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)/


/at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)/

/at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)/


/at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)/


/at java.security.AccessController.doPrivileged(Native Method)/

/at javax.security.auth.Subject.doAs(Subject.java:422)/

/at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)/


/at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)/


/at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)/

/Caused by: 
org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation 
Token can be issued only with kerberos or web authentication/


/at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)/


/at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)/


/at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)/


/at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)/


/at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)/


/at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)/


/at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)/

/at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)/

/at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)/

/at java.security.AccessController.doPrivileged(Native Method)/

/at javax.security.auth.Subject.doAs(Subject.java:422)/

/at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)/


/at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215) /

/at org.apache.hadoop.ipc.Client.call(Client.java:1472)/

/at org.apache.hadoop.ipc.Client.call(Client.java:1409)/

/at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)/


/at com.sun.proxy.$Proxy18.getDelegationToken(Unknown Source)/

/at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)/


/at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)/

/at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)/


/at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)/


/at java.lang.reflect.Method.invoke(Method.java:498)/

/at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)/


/at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)/


/at com.sun.proxy.$Proxy19.getDelegationToken(Unknown Source)/

/at 
org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)/


/at 
org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)/


/at 
org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)/


/at 
org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)/


/at 
org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)/


/at 
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)/


/at 
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)/


/at 
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)/


/at org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)/

/at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)/


/at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)/


/at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)/


//

The kerberos configuration in this case is the default one. Then I 
tried with this option set to false 
‘security.k

Re: Late outputs for Session Window

2020-01-03 Thread KristoffSC
After following suggestion from SO
I added few changes, so now I'm using Event Time
Water marks are progressing, I've checked them in Flink's metrics. The
Window operator is triggered but still I don't see any late outputs for
this. 


StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000,
1000));
env.setParallelism(1);
env.disableOperatorChaining();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);


DataStream rawBusinessTransaction = env
.addSource(new FlinkKafkaConsumer<>("business",
new JSONKeyValueDeserializationSchema(false),
properties))
.map(new KafkaTransactionObjectMapOperator())
.assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks() {

@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis());
}

@Override
public long extractTimestamp(RawMessage element, long
previousElementTimestamp) {
return element.messageCreationTime;
}
})
.name("Kafka Transaction Raw Data Source.");

messageStream
 .keyBy(tradeKeySelector)
 .window(EventTimeSessionWindows.withDynamicGap(new
TradeAggregationGapExtractor()))
 .sideOutputLateData(lateTradeMessages)
 .process(new CumulativeTransactionOperator())
 .name("Aggregate Transaction Builder");






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-03 Thread Jark Wu
Hi Benoît,

Thanks for the reminder. I will look into the issue and hopefully we can
target it into 1.9.2 and 1.10.

Cheers,
Jark

On Fri, 3 Jan 2020 at 18:21, Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> >  If anyone finds that blink planner has any significant defects and has
> a larger regression than the old planner, please let us know.
>
> Overall, the Blink-exclusive features are must (TopN, deduplicate,
> LAST_VALUE, plan reuse, etc)! But all use cases of the Legacy planner in
> production are not covered:
> An edge case of Temporal Table Functions does not allow computed Tables
> (as opposed to TableSources) to be used on the query side in Blink (
> https://issues.apache.org/jira/browse/FLINK-14200)
>
> Cheers
> Ben
>
>
> On Fri, Jan 3, 2020 at 10:00 AM Jeff Zhang  wrote:
>
>> +1, I have already made blink as the default planner of flink interpreter
>> in Zeppelin
>>
>>
>> Jingsong Li  于2020年1月3日周五 下午4:37写道:
>>
>>> Hi Jark,
>>>
>>> +1 for default blink planner in SQL-CLI.
>>> I believe this new planner can be put into practice in production.
>>> We've worked hard for nearly a year, but the old planner didn't move on.
>>>
>>> And I'd like to cc to user@flink.apache.org.
>>> If anyone finds that blink planner has any significant defects and has a
>>> larger regression than the old planner, please let us know. We will be very
>>> grateful.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu  wrote:
>>>
 +1 for this.
 We bring many SQL/API features and enhance stability in 1.10 release,
 and almost all of them happens in Blink planner.
 SQL CLI is the most convenient entrypoint for me, I believe many users
 will have a better experience If we set Blink planner as default planner.

 Best,
 Leonard

 > 在 2020年1月3日,15:16,Terry Wang  写道:
 >
 > Since what blink planner can do is a superset of flink planner, big
 +1 for changing the default planner to Blink planner from my side.
 >
 > Best,
 > Terry Wang
 >
 >
 >
 >> 2020年1月3日 15:00,Jark Wu  写道:
 >>
 >> Hi everyone,
 >>
 >> In 1.10 release, Flink SQL supports many awesome features and
 improvements,
 >> including:
 >> - support watermark statement and computed column in DDL
 >> - fully support all data types in Hive
 >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
 >> - support INSERT OVERWRITE and INSERT PARTITION
 >>
 >> However, all the features and improvements are only avaiable in Blink
 >> planner, not in Old planner.
 >> There are also some other features are limited in Blink planner, e.g.
 >> Dimension Table Join [1],
 >> TopN [2], Deduplicate [3], streaming aggregates optimization [4],
 and so on.
 >>
 >> But Old planner is still the default planner in Table API & SQL. It
 is
 >> frustrating for users to set
 >> to blink planner manually when every time start a SQL CLI. And it's
 >> surprising to see unsupported
 >> exception if they trying out the new features but not switch planner.
 >>
 >> SQL CLI is a very important entrypoint for trying out new feautures
 and
 >> prototyping for users.
 >> In order to give new planner more exposures, I would like to suggest
 to set
 >> default planner
 >> for SQL Client to Blink planner before 1.10 release.
 >>
 >> The approach is just changing the default SQL CLI yaml
 configuration[5]. In
 >> this way, the existing
 >> environment is still compatible and unaffected.
 >>
 >> Changing the default planner for the whole Table API & SQL is
 another topic
 >> and is out of scope of this discussion.
 >>
 >> What do you think?
 >>
 >> Best,
 >> Jark
 >>
 >> [1]:
 >>
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
 >> [2]:
 >>
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#top-n
 >> [3]:
 >>
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
 >> [4]:
 >>
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html
 >> [5]:
 >>
 https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/conf/sql-client-defaults.yaml#L100
 >


>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
> --
> Benoît Paris
> Ingénieur Machine Learning Explicable
> Tél : +33 6 60 74 23 00
> http://benoit.paris
> http://explicable.ml
>


Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-03 Thread Benoît Paris
>  If anyone finds that blink planner has any significant defects and has a
larger regression than the old planner, please let us know.

Overall, the Blink-exclusive features are must (TopN, deduplicate,
LAST_VALUE, plan reuse, etc)! But all use cases of the Legacy planner in
production are not covered:
An edge case of Temporal Table Functions does not allow computed Tables (as
opposed to TableSources) to be used on the query side in Blink (
https://issues.apache.org/jira/browse/FLINK-14200)

Cheers
Ben


On Fri, Jan 3, 2020 at 10:00 AM Jeff Zhang  wrote:

> +1, I have already made blink as the default planner of flink interpreter
> in Zeppelin
>
>
> Jingsong Li  于2020年1月3日周五 下午4:37写道:
>
>> Hi Jark,
>>
>> +1 for default blink planner in SQL-CLI.
>> I believe this new planner can be put into practice in production.
>> We've worked hard for nearly a year, but the old planner didn't move on.
>>
>> And I'd like to cc to user@flink.apache.org.
>> If anyone finds that blink planner has any significant defects and has a
>> larger regression than the old planner, please let us know. We will be very
>> grateful.
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu  wrote:
>>
>>> +1 for this.
>>> We bring many SQL/API features and enhance stability in 1.10 release,
>>> and almost all of them happens in Blink planner.
>>> SQL CLI is the most convenient entrypoint for me, I believe many users
>>> will have a better experience If we set Blink planner as default planner.
>>>
>>> Best,
>>> Leonard
>>>
>>> > 在 2020年1月3日,15:16,Terry Wang  写道:
>>> >
>>> > Since what blink planner can do is a superset of flink planner, big +1
>>> for changing the default planner to Blink planner from my side.
>>> >
>>> > Best,
>>> > Terry Wang
>>> >
>>> >
>>> >
>>> >> 2020年1月3日 15:00,Jark Wu  写道:
>>> >>
>>> >> Hi everyone,
>>> >>
>>> >> In 1.10 release, Flink SQL supports many awesome features and
>>> improvements,
>>> >> including:
>>> >> - support watermark statement and computed column in DDL
>>> >> - fully support all data types in Hive
>>> >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
>>> >> - support INSERT OVERWRITE and INSERT PARTITION
>>> >>
>>> >> However, all the features and improvements are only avaiable in Blink
>>> >> planner, not in Old planner.
>>> >> There are also some other features are limited in Blink planner, e.g.
>>> >> Dimension Table Join [1],
>>> >> TopN [2], Deduplicate [3], streaming aggregates optimization [4], and
>>> so on.
>>> >>
>>> >> But Old planner is still the default planner in Table API & SQL. It is
>>> >> frustrating for users to set
>>> >> to blink planner manually when every time start a SQL CLI. And it's
>>> >> surprising to see unsupported
>>> >> exception if they trying out the new features but not switch planner.
>>> >>
>>> >> SQL CLI is a very important entrypoint for trying out new feautures
>>> and
>>> >> prototyping for users.
>>> >> In order to give new planner more exposures, I would like to suggest
>>> to set
>>> >> default planner
>>> >> for SQL Client to Blink planner before 1.10 release.
>>> >>
>>> >> The approach is just changing the default SQL CLI yaml
>>> configuration[5]. In
>>> >> this way, the existing
>>> >> environment is still compatible and unaffected.
>>> >>
>>> >> Changing the default planner for the whole Table API & SQL is another
>>> topic
>>> >> and is out of scope of this discussion.
>>> >>
>>> >> What do you think?
>>> >>
>>> >> Best,
>>> >> Jark
>>> >>
>>> >> [1]:
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>>> >> [2]:
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#top-n
>>> >> [3]:
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>>> >> [4]:
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html
>>> >> [5]:
>>> >>
>>> https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/conf/sql-client-defaults.yaml#L100
>>> >
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml


Re: Checkpoints issue and job failing

2020-01-03 Thread Congxian Qiu
Hi

Do you have ever check that this problem exists on Flink 1.9?

Best,
Congxian


vino yang  于2020年1月3日周五 下午3:54写道:

> Hi Navneeth,
>
> Did you check if the path contains in the exception is really can not be
> found?
>
> Best,
> Vino
>
> Navneeth Krishnan  于2020年1月3日周五 上午8:23写道:
>
>> Hi All,
>>
>> We are running into checkpoint timeout issue more frequently in
>> production and we also see the below exception. We are running flink 1.4.0
>> and the checkpoints are saved on NFS. Can someone suggest how to overcome
>> this?
>>
>> [image: image.png]
>>
>> java.lang.IllegalStateException: Could not initialize operator state backend.
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.FileNotFoundException: 
>> /mnt/checkpoints/02c4f8d5c11921f363b98c5959cc4f06/chk-101/e71d8eaf-ff4a-4783-92bd-77e3d8978e01
>>  (No such file or directory)
>>  at java.io.FileInputStream.open0(Native Method)
>>  at java.io.FileInputStream.open(FileInputStream.java:195)
>>  at java.io.FileInputStream.(FileInputStream.java:138)
>>  at 
>> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
>>
>>
>> Thanks
>>
>>


Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-03 Thread Jeff Zhang
+1, I have already made blink as the default planner of flink interpreter
in Zeppelin


Jingsong Li  于2020年1月3日周五 下午4:37写道:

> Hi Jark,
>
> +1 for default blink planner in SQL-CLI.
> I believe this new planner can be put into practice in production.
> We've worked hard for nearly a year, but the old planner didn't move on.
>
> And I'd like to cc to user@flink.apache.org.
> If anyone finds that blink planner has any significant defects and has a
> larger regression than the old planner, please let us know. We will be very
> grateful.
>
> Best,
> Jingsong Lee
>
> On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu  wrote:
>
>> +1 for this.
>> We bring many SQL/API features and enhance stability in 1.10 release, and
>> almost all of them happens in Blink planner.
>> SQL CLI is the most convenient entrypoint for me, I believe many users
>> will have a better experience If we set Blink planner as default planner.
>>
>> Best,
>> Leonard
>>
>> > 在 2020年1月3日,15:16,Terry Wang  写道:
>> >
>> > Since what blink planner can do is a superset of flink planner, big +1
>> for changing the default planner to Blink planner from my side.
>> >
>> > Best,
>> > Terry Wang
>> >
>> >
>> >
>> >> 2020年1月3日 15:00,Jark Wu  写道:
>> >>
>> >> Hi everyone,
>> >>
>> >> In 1.10 release, Flink SQL supports many awesome features and
>> improvements,
>> >> including:
>> >> - support watermark statement and computed column in DDL
>> >> - fully support all data types in Hive
>> >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
>> >> - support INSERT OVERWRITE and INSERT PARTITION
>> >>
>> >> However, all the features and improvements are only avaiable in Blink
>> >> planner, not in Old planner.
>> >> There are also some other features are limited in Blink planner, e.g.
>> >> Dimension Table Join [1],
>> >> TopN [2], Deduplicate [3], streaming aggregates optimization [4], and
>> so on.
>> >>
>> >> But Old planner is still the default planner in Table API & SQL. It is
>> >> frustrating for users to set
>> >> to blink planner manually when every time start a SQL CLI. And it's
>> >> surprising to see unsupported
>> >> exception if they trying out the new features but not switch planner.
>> >>
>> >> SQL CLI is a very important entrypoint for trying out new feautures and
>> >> prototyping for users.
>> >> In order to give new planner more exposures, I would like to suggest
>> to set
>> >> default planner
>> >> for SQL Client to Blink planner before 1.10 release.
>> >>
>> >> The approach is just changing the default SQL CLI yaml
>> configuration[5]. In
>> >> this way, the existing
>> >> environment is still compatible and unaffected.
>> >>
>> >> Changing the default planner for the whole Table API & SQL is another
>> topic
>> >> and is out of scope of this discussion.
>> >>
>> >> What do you think?
>> >>
>> >> Best,
>> >> Jark
>> >>
>> >> [1]:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>> >> [2]:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#top-n
>> >> [3]:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>> >> [4]:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html
>> >> [5]:
>> >>
>> https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/conf/sql-client-defaults.yaml#L100
>> >
>>
>>
>
> --
> Best, Jingsong Lee
>


-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-03 Thread Jingsong Li
Hi Jark,

+1 for default blink planner in SQL-CLI.
I believe this new planner can be put into practice in production.
We've worked hard for nearly a year, but the old planner didn't move on.

And I'd like to cc to user@flink.apache.org.
If anyone finds that blink planner has any significant defects and has a
larger regression than the old planner, please let us know. We will be very
grateful.

Best,
Jingsong Lee

On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu  wrote:

> +1 for this.
> We bring many SQL/API features and enhance stability in 1.10 release, and
> almost all of them happens in Blink planner.
> SQL CLI is the most convenient entrypoint for me, I believe many users
> will have a better experience If we set Blink planner as default planner.
>
> Best,
> Leonard
>
> > 在 2020年1月3日,15:16,Terry Wang  写道:
> >
> > Since what blink planner can do is a superset of flink planner, big +1
> for changing the default planner to Blink planner from my side.
> >
> > Best,
> > Terry Wang
> >
> >
> >
> >> 2020年1月3日 15:00,Jark Wu  写道:
> >>
> >> Hi everyone,
> >>
> >> In 1.10 release, Flink SQL supports many awesome features and
> improvements,
> >> including:
> >> - support watermark statement and computed column in DDL
> >> - fully support all data types in Hive
> >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
> >> - support INSERT OVERWRITE and INSERT PARTITION
> >>
> >> However, all the features and improvements are only avaiable in Blink
> >> planner, not in Old planner.
> >> There are also some other features are limited in Blink planner, e.g.
> >> Dimension Table Join [1],
> >> TopN [2], Deduplicate [3], streaming aggregates optimization [4], and
> so on.
> >>
> >> But Old planner is still the default planner in Table API & SQL. It is
> >> frustrating for users to set
> >> to blink planner manually when every time start a SQL CLI. And it's
> >> surprising to see unsupported
> >> exception if they trying out the new features but not switch planner.
> >>
> >> SQL CLI is a very important entrypoint for trying out new feautures and
> >> prototyping for users.
> >> In order to give new planner more exposures, I would like to suggest to
> set
> >> default planner
> >> for SQL Client to Blink planner before 1.10 release.
> >>
> >> The approach is just changing the default SQL CLI yaml
> configuration[5]. In
> >> this way, the existing
> >> environment is still compatible and unaffected.
> >>
> >> Changing the default planner for the whole Table API & SQL is another
> topic
> >> and is out of scope of this discussion.
> >>
> >> What do you think?
> >>
> >> Best,
> >> Jark
> >>
> >> [1]:
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
> >> [2]:
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#top-n
> >> [3]:
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
> >> [4]:
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html
> >> [5]:
> >>
> https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/conf/sql-client-defaults.yaml#L100
> >
>
>

-- 
Best, Jingsong Lee