[jira] [Created] (FLINK-24148) Add bloom filter policy option in RocksDBConfiguredOptions

2021-09-03 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-24148:
--

 Summary: Add bloom filter policy option in RocksDBConfiguredOptions
 Key: FLINK-24148
 URL: https://issues.apache.org/jira/browse/FLINK-24148
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.13.2, 1.14.1
Reporter: Jiayi Liao


Bloom filter can efficiently enhance the read process, especially for the 
reading among L0 files. (more details see 
https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23690) Processing timers can be triggered more efficiently

2021-08-09 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-23690:
--

 Summary: Processing timers can be triggered more efficiently
 Key: FLINK-23690
 URL: https://issues.apache.org/jira/browse/FLINK-23690
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Affects Versions: 1.13.2, 1.12.5, 1.11.4, 1.14.0
Reporter: Jiayi Liao


After FLINK-23208, the processing timers are triggered more efficiently but it 
can still be improved. (The performance can be tested with 
[benchmark|https://github.com/apache/flink-benchmarks/blob/master/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java])

Currently {{InternalTimerService.onProcessingTime(long)}} polls a timer from 
{{processingTimeTimersQueue}} and register a new timer after the polled timer 
is triggered, which means timers with different timestamps will be registered 
for multiple times. This can be improved with codes below: 

{code:java}
long now = System.currentTimeMillis() - 1
while ((timer = processingTimeTimersQueue.peek()) != null && 
timer.getTimestamp() <= now) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer);
}
{code}

But due to the bug described in FLINK-23689, this change has conflicts with 
current implementation of {{TestProcessingTimeService.setCurrentTime(long)}}, 
which causes a lot of tests to fail(e.g. InternalTimerServiceImplTest). 
Therefore, before working on this improvement, FLINK-23689 should be fixed 
firstly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23689) TestProcessingTimeService.setCurrentTime(long) not delay the firing timer by 1ms delay

2021-08-09 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-23689:
--

 Summary: TestProcessingTimeService.setCurrentTime(long) not delay 
the firing timer by 1ms delay
 Key: FLINK-23689
 URL: https://issues.apache.org/jira/browse/FLINK-23689
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task, Tests
Affects Versions: 1.13.2, 1.12.4, 1.11.4, 1.14.0
Reporter: Jiayi Liao


FLINK-9857 enabled {{SystemProcessingTimeService}}  to fire processing timers 
by 1ms delay but it ignored {{TestProcessingTimeService}}, and the method 
{{TestProcessingTimeService.setCurrentTime}} can still trigger the processing 
timers whose timestamp is equal to {{currentTime}}. 

{code:java}
while (!priorityQueue.isEmpty() && currentTime >= priorityQueue.peek().f0) {
..
}
{code}

We can simply fix the problem with {{currentTime > priorityQueue.peek().f0}}, 
but it will break too many existing tests: 

* Tests using {{TestProcessingTimeService.setCurrentTime(long)}} (64 usage)
* Tests using {{AbstractStreamOperatorTestHarness.setProcessingTime(long)}} 
(368 usage)

Tips: Some tests can be fixed with plusing 1ms on the parameter of 
{{TestProcessingTimeService.setCurrentTime(long)}}, but this doesn't work in 
tests that invokes {{setCurrentTime(long)}} for serveral times.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] RocksDB Version Upgrade and Performance

2021-08-08 Thread Jiayi Liao
Hi Yun,

Thanks for your detailed description about the progress of Flink and
RocksDB's community. There're more than 1,200 jobs using RocksDB as the
state backend at Bytedance, and we do met several problems mentioned in the
JIRA issues you referred:

(1) Memory Management: for large-scale jobs(10TB+ state), it's hard to tune
the memory usage due to non-restrict memory control on RocksDB. And
currently we have to manually estimate the memory usage based on RocksDB's
wiki, which increases our maintainence's cost a lot.
(1) DeleteRange Support: we've made a few benchmarks on the performance of
rescaling and found out the time cost is up to a few minutes when a task's
state is larger than 10GB. I'm glad to see such improvements being merged
after upgrading RocksDB's version.
(3) ARM support: we've supported ARM's platform on our own last year by
some hacking on the codes, and it's great to see that RocksDB has an
official release on ARM platform.

I think the new features(or bugfix) are more important for us and I'm +1
for this.


Best,
Jiayi Liao

On Thu, Aug 5, 2021 at 1:50 AM Yun Tang  wrote:

> Hi Yuval,
>
> Upgrading RocksDB version is a long story since Flink-1.10.
> When we first plan to introduce write buffer manager to help control the
> memory usage of RocksDB, we actually wanted to bump up to RocksDB-5.18 from
> current RocksDB-5.17. However, we found performance regression in our micro
> benchmark on state operations [1] if bumped to RocksDB-5.18. We did not
> figure the root cause at that time and decide to cherry pick the commits of
> write buffer manager to our own FRocksDB [2]. And we finally released our
> own frocksdbjni-5.17.2-artisans-2.0 at that time.
>
> As time goes no, more and more bugs or missed features have been reported
> in the old RocksDB version. Such as:
>
>1. Cannot support ARM platform [3]
>2. Dose not have stable deleteRange API, which is useful for Flink
>scale out [4]
>3. Cannot support strict block cache [5]
>4. Checkpoint might stuck if using UNIVERSVAL compaction strategy [6]
>5. Uncontrolled log size make us disabled the RocksDB internal LOG [7]
>6. RocksDB's optimizeForPointLookup option might cause data lost [8]
>7. Current dummy entry used for memory control in RocksDB-5.17 is too
>large, leading performance problem [9]
>8. Cannot support alpine-based images.
>9. ...
>
> Some of the bugs are walked around, and some are still open.
>
> And we decide to make some changes from Flink-1.12. First of all, we
> reported the performance regression problem compared with RocksDB-5.18 and
> RocksDB-5.17 to RocksDB community [10]. However, as RocksDB-5.x versions
> are a bit older for the community, and RocksJava usage might not be the
> core part for facebook guys, we did not get useful replies. Thus, we decide
> to figure out the root cause of performance regression by ourself.
> Fortunately, we find the cause via binary search the commits among
> RocksDB-5.18 and RocksDB-5.17, and updated in the original thread [10]. To
> be short, the performance regression is due to different implementation of
> `__thread` and `thread_local` in gcc and would have more impact on dynamic
> loading [11], which is also what current RocksJava jar package does. With
> my patch [12], the performance regression would disappear if comparing
> RocksDB-5.18 with RocksDB-5.17.
>
> Unfortunately, RocksDB-5.18 still has many bugs and we want to bump to
> RocksDB-6.x. However, another performance regression appeared even with my
> patch [12]. With previous knowledge, we know that we must verify the built
> .so files with our java-based benchmark instead of using RocksDB built-in
> db-bench. I started to search the 1340+ commits from RocksDB-5.18 to
> RocksDB-6.11 to find the performance problem. However, I did not figure out
> the root cause after spending several weeks this time. The performance
> behaves up and down in those commits and I cannot get *the commit *which
> lead the performance regression. Take this commit of integrating block
> cache tracer in block-based table reader [13] for example, I noticed that
> this commit would cause a bit performance regression and that might be the
> useless usage accounting in operations, however, the problematic code was
> changed in later commits. Thus, after several weeks digging, I have to give
> up for the endless searching in the thousand commits temporarily. As
> RocksDB community seems not make the project management system public,
> unlike Apache's open JIRA systems, we do not know what benchmark they
> actually run before releasing each version to guarantee the performance.
>
> With my patch [10] on latest RocksDB-6.20.3, we could get the results on
> nexmark in the original thread sent by Stephan, and we can see 

Re: [ANNOUNCE] New Apache Flink Committer - Yuan Mei

2021-07-08 Thread Jiayi Liao
Congratulations Yuan!

Best,
Jiayi Liao

On Thu, Jul 8, 2021 at 3:55 PM Roman Khachatryan  wrote:

> Congratulations Yuan!
>
> Regards,
> Roman
>
> On Thu, Jul 8, 2021 at 6:02 AM Yang Wang  wrote:
> >
> > Congratulations Yuan!
> >
> > Best,
> > Yang
> >
> > XING JIN  于2021年7月8日周四 上午11:46写道:
> >
> > > Congratulations Yuan~!
> > >
> > > Roc Marshal  于2021年7月8日周四 上午11:28写道:
> > >
> > > > Congratulations, Yuan!
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > At 2021-07-08 01:21:40, "Yu Li"  wrote:
> > > > >Hi all,
> > > > >
> > > > >On behalf of the PMC, I’m very happy to announce Yuan Mei as a new
> Flink
> > > > >committer.
> > > > >
> > > > >Yuan has been an active contributor for more than two years, with
> code
> > > > >contributions on multiple components including kafka connectors,
> > > > >checkpointing, state backends, etc. Besides, she has been actively
> > > > involved
> > > > >in community activities such as helping manage releases, discussing
> > > > >questions on dev@list, supporting users and giving talks at
> > > conferences.
> > > > >
> > > > >Please join me in congratulating Yuan for becoming a Flink
> committer!
> > > > >
> > > > >Cheers,
> > > > >Yu
> > > >
> > >
>


Re: [ANNOUNCE] New PMC member: Guowei Ma

2021-07-06 Thread Jiayi Liao
Congratulations!

Best,
Jiayi Liao


On Tue, Jul 6, 2021 at 10:01 PM Kurt Young  wrote:

> Hi all!
>
> I'm very happy to announce that Guowei Ma has joined the Flink PMC!
>
> Congratulations and welcome Guowei!
>
> Best,
> Kurt
>


[jira] [Created] (FLINK-23208) Late processing timers need to wait 1ms at least to be fired

2021-07-01 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-23208:
--

 Summary: Late processing timers need to wait 1ms at least to be 
fired
 Key: FLINK-23208
 URL: https://issues.apache.org/jira/browse/FLINK-23208
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.11.0
Reporter: Jiayi Liao


The problem is from the codes below:

```
public static long getProcessingTimeDelay(long processingTimestamp, long 
currentTimestamp) {

// delay the firing of the timer by 1 ms to align the semantics with 
watermark. A watermark
// T says we won't see elements in the future with a timestamp smaller 
or equal to T.
// With processing time, we therefore need to delay firing the timer by 
one ms.
return Math.max(processingTimestamp - currentTimestamp, 0) + 1;
}
```

Assuming a Flink job creates 1 timer per millionseconds, and is able to consume 
1 timer/ms. Here is what will happen: 

* Timestmap1(1st ms): timer1 is registered and will be triggered on Timestamp2. 
* Timestamp2(2nd ms): timer2 is registered and timer1 is triggered
* Timestamp3(3rd ms): timer3 is registered and timer1 is consumed, after this, 
{{InternalTimerServiceImpl}} registers next timer, which is timer2, and timer2 
will be triggered on Timestamp4(wait 1ms at least)
* Timestamp4(4th ms): timer4 is registered and timer2 is triggered
* Timestamp5(5th ms): timer5 is registered and timer2 is consumed, after this, 
{{InternalTimerServiceImpl}} registers next timer, which is timer3, and timer3 
will be triggered on Timestamp6(wait 1ms at least)

As we can see here, the ability of the Flink job is consuming 1 timer/ms, but 
it's actually able to consume 0.5 timer/ms. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23180) Initialize checkpoint location lazily in DataStream Batch Jobs

2021-06-29 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-23180:
--

 Summary: Initialize checkpoint location lazily in DataStream Batch 
Jobs
 Key: FLINK-23180
 URL: https://issues.apache.org/jira/browse/FLINK-23180
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.11.0
Reporter: Jiayi Liao


Currently batch jobs will initialize checkpoint location early when 
{{CheckpointCoordinator}} is created, which will create lots of useless 
directories on distributed filesystem. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] New PMC member: Xintong Song

2021-06-16 Thread Jiayi Liao
>
> 
> Congratulations Xintong!
>
> On Wed, Jun 16, 2021 at 7:24 PM Nicholas Jiang 
> wrote:
>
>> Congratulations, Xintong!
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>
>


[jira] [Created] (FLINK-22887) Backlog based optimizations for RebalancePartitioner and RescalePartitioner

2021-06-05 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-22887:
--

 Summary: Backlog based optimizations for RebalancePartitioner and 
RescalePartitioner
 Key: FLINK-22887
 URL: https://issues.apache.org/jira/browse/FLINK-22887
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.13.1
Reporter: Jiayi Liao


{\{RebalancePartitioner}} uses round-robin to distribute the records but this 
may not work as expected, because the environments and the processing ability 
of the downstream tasks may differ from each other. In such cases, the 
throughput of the whole job will be limited by the slowest downstream subtask, 
which is very similar with the "HASH" scenario.

Instead, after the credit-based mechanism is introduced, we can leverage the 
{{backlog}} on the sender side to identify the "load" on each receiver side, 
which help us distribute the data more fairly in {{RebalancePartitioner}} and 
{{RescalePartitioner}}. 




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22886) Thread leak in RocksDBStateUploader

2021-06-05 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-22886:
--

 Summary: Thread leak in RocksDBStateUploader
 Key: FLINK-22886
 URL: https://issues.apache.org/jira/browse/FLINK-22886
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.12.4, 1.13.1, 1.11.3
Reporter: Jiayi Liao


{{ExecutorService}} in {{RocksDBStateUploader}} is not shut down, which may 
leak thread when tasks fail. 

BTW, we should name the thread group in {{ExecutorService}}, otherwise what we 
see in the stack, is a lot of threads named with "pool-{x}-thread-{y}".




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21413) TtlMapState and TtlListState cannot be clean completely with Filesystem StateBackend

2021-02-18 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-21413:
--

 Summary: TtlMapState and TtlListState cannot be clean completely 
with Filesystem StateBackend
 Key: FLINK-21413
 URL: https://issues.apache.org/jira/browse/FLINK-21413
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.9.0
Reporter: Jiayi Liao


Take the #TtlMapState as an example,

 
{code:java}
public Map> getUnexpiredOrNull(@Nonnull Map> 
ttlValue) {
Map> unexpired = new HashMap<>();
TypeSerializer> valueSerializer =
((MapSerializer>) 
original.getValueSerializer()).getValueSerializer();
for (Map.Entry> e : ttlValue.entrySet()) {
if (!expired(e.getValue())) {
// we have to do the defensive copy to update the value
unexpired.put(e.getKey(), 
valueSerializer.copy(e.getValue()));
}
}
return ttlValue.size() == unexpired.size() ? ttlValue : unexpired;
}
{code}
 

The returned value will never be null and the #StateEntry will exists forever, 
which leads to memory leak if the key's range of the stream is very large. 
Below we can see that 20+ millison uncleared TtlStateMap could take up several 
GB memory.

 

!image-2021-02-19-11-13-38-691.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20684) Add compression type option in RocksDBConfigurableOptions

2020-12-20 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-20684:
--

 Summary: Add compression type option in RocksDBConfigurableOptions
 Key: FLINK-20684
 URL: https://issues.apache.org/jira/browse/FLINK-20684
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.11.3
Reporter: Jiayi Liao


Currently RocksDB uses snappy compression by default, but this could be a huge 
cost on CPU time, and for most cases we may prefer using more disk space than 
CPU. 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20269) The flush happens too frequent in SavepoinV2Serializer

2020-11-21 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-20269:
--

 Summary: The flush happens too frequent in SavepoinV2Serializer
 Key: FLINK-20269
 URL: https://issues.apache.org/jira/browse/FLINK-20269
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.9.0
Reporter: Jiayi Liao


The reason I notice this is, I find that the metadata's persistence can be very 
slow, especially when the network is unstable, and almost every time I dump the 
stack of the process, the bottleneck happens on Hdfs client waiting for 
Datanodes' ack during metadata's persistance.

 

I wonder, is it really necessary to flush the stream after every 
{{StreamStateHandle}}'s serialization?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20087) CheckpointCoordinator waits until all tasks finish initialization of states to trigger checkpoint

2020-11-11 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-20087:
--

 Summary: CheckpointCoordinator waits until all tasks finish 
initialization of states to trigger checkpoint
 Key: FLINK-20087
 URL: https://issues.apache.org/jira/browse/FLINK-20087
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.9.0
Reporter: Jiayi Liao


{{CheckpointCoordinator}} will start triggering checkpoint after all tasks send 
RUNNING status to JobMaster. However, the {{initializeState}} could be 
time-expensive(a few minutes), for example the task needs to rescale with 
multiple old tasks' rocksdb data. During this process, the triggering of 
checkpoints are meaningless. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20044) Disposal of RocksDB could last forever

2020-11-07 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-20044:
--

 Summary: Disposal of RocksDB could last forever
 Key: FLINK-20044
 URL: https://issues.apache.org/jira/browse/FLINK-20044
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.9.0
Reporter: Jiayi Liao


The task cannot fail itself because it's stuck on the disposal of RocksDB, 
which also affects the job. I saw this for several times in recent months, most 
of the errors come from the broken disk. But I think we should also do 
something to deal with it more elegantly from Flink's perspective.
{code:java}
"LookUp_Join -> Sink_Unnamed (898/1777)- execution # 4" #411 prio=5 os_prio=0 
tid=0x7fc9b0286800 nid=0xff6fc runnable [0x7fc966cfc000]
   java.lang.Thread.State: RUNNABLE
at org.rocksdb.RocksDB.disposeInternal(Native Method)
at org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37)
at 
org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:57)
at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:349)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:371)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:124)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:618)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:517)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:733)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:539)
at java.lang.Thread.run(Thread.java:748)

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-12 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-19596:
--

 Summary: Do not recover CompletedCheckpointStore on each failover
 Key: FLINK-19596
 URL: https://issues.apache.org/jira/browse/FLINK-19596
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.11.2
Reporter: Jiayi Liao


{{completedCheckpointStore.recover()}} in 
{{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
because the {{CompletedCheckpointStore}} needs to load HDFS files to 
instantialize the {{CompleteCheckpoint}} instances.

The impact is significant in our case below:

* Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
other filesystems.
* If a machine goes down, several containers and tens of tasks are affected, 
which means the {{completedCheckpointStore.recover()}} would be called tens of 
times since the tasks are not in a failover region.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19150) Behaviour change after migration from 1.9 to 1.11

2020-09-07 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-19150:
--

 Summary: Behaviour change after migration from 1.9 to 1.11
 Key: FLINK-19150
 URL: https://issues.apache.org/jira/browse/FLINK-19150
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.11.0
Reporter: Jiayi Liao


In Flink 1.9, if we submit a job in attach mode, the client will help to shut 
down the cluster in the end. While in Flink 1.11, the cluster will be shut down 
as long as the job result is requested, no matters where the request comes from.

Currently we've found that the client cannot succeed to get the job execution 
result and report connection lost error because the result has already been 
requested and the cluster is shut down. The root cause is under investigation 
but we think it might be related to the network environment since the 
probability of occurance is very low.

I'm aware of that there's a big restructure on the job submission from Flink 
1.9 to 1.11. But is this change expected?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19069) finalizeOnMaster takes too much time and client timeouts

2020-08-27 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-19069:
--

 Summary: finalizeOnMaster takes too much time and client timeouts
 Key: FLINK-19069
 URL: https://issues.apache.org/jira/browse/FLINK-19069
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Affects Versions: 1.9.0
Reporter: Jiayi Liao


Currently we execute {{finalizeOnMaster}} in JM's main thread, which may stuck 
the JM for a very long time and client timeouts eventually. 

For example, we'd like to write data to HDFS  and commit files on JM, which 
takes more than ten minutes to commit tens of thousands files.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19016) Checksum mismatch when restore from RocksDB

2020-08-21 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-19016:
--

 Summary: Checksum mismatch when restore from RocksDB
 Key: FLINK-19016
 URL: https://issues.apache.org/jira/browse/FLINK-19016
 Project: Flink
  Issue Type: Bug
Reporter: Jiayi Liao


The error stack trace below:

{code:java}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedMapBundleOperator_44cfc1ca74b40bb44eed1f38f72b3ea9_(71/300) 
from any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 6 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:580)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 8 more
Caused by: java.io.IOException: Error while opening RocksDB instance.
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:74)
at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:214)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:277)
... 12 more
Caused by: org.rocksdb.RocksDBException: checksum mismatch
at org.rocksdb.RocksDB.open(Native Method)
at org.rocksdb.RocksDB.open(RocksDB.java:286)
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:66)
... 18 more
{code}

The machine goes down because of hardware problem, then the job cannot restart 
successfully anymore. After digging a little bit, I found that RocksDB in Flink 
uses sync instead of fsync to synchronized the data with the disk. With sync 
operation, the RocksDB cannot guarantee that the current in-progress file can 
be persisted on disk in takeDBNativeCheckpoint.








--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19011) Parallelize the restoreOperation in OperatorStateBackend

2020-08-20 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-19011:
--

 Summary: Parallelize the restoreOperation in OperatorStateBackend 
 Key: FLINK-19011
 URL: https://issues.apache.org/jira/browse/FLINK-19011
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.11.1
Reporter: Jiayi Liao


To restore the states, union state needs to read state handles produced by all 
operators. And currently during the restore operation, Flink iterates the state 
handles one by one, which could last tens of minutes if the magnitude of state 
handles exceeds ten thousand. 

To accelerate the process, I propose to parallelize the random reads on HDFS 
and deserialization. We can create a runnable for each state handle and let it 
return the metadata and deserialized data, which can be aggregated in main 
thread.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] New PMC member: Piotr Nowojski

2020-07-06 Thread Jiayi Liao
Congratulations Piotr!

Best,
Jiayi Liao

On Tue, Jul 7, 2020 at 10:54 AM Jark Wu  wrote:

> Congratulations Piotr!
>
> Best,
> Jark
>
> On Tue, 7 Jul 2020 at 10:50, Yuan Mei  wrote:
>
> > Congratulations, Piotr!
> >
> > On Tue, Jul 7, 2020 at 1:07 AM Stephan Ewen  wrote:
> >
> > > Hi all!
> > >
> > > It is my pleasure to announce that Piotr Nowojski joined the Flink PMC.
> > >
> > > Many of you may know Piotr from the work he does on the data processing
> > > runtime and the network stack, from the mailing list, or the release
> > > manager work.
> > >
> > > Congrats, Piotr!
> > >
> > > Best,
> > > Stephan
> > >
> >
>


[jira] [Created] (FLINK-18348) RemoteInputChannel should checkError before checking partitionRequestClient

2020-06-17 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-18348:
--

 Summary: RemoteInputChannel should checkError before checking 
partitionRequestClient
 Key: FLINK-18348
 URL: https://issues.apache.org/jira/browse/FLINK-18348
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.10.1
Reporter: Jiayi Liao


The error will be set and \{{partitionRequestClient}} will be a null value if a 
remote channel fails to request the partition at the beginning. And the task 
will fail 
[here](https://github.com/apache/flink/blob/2150533ac0b2a6cc00238041853bbb6ebf22cee9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java#L172)
 when the task thread trying to fetch data from channel.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18203) Reduce objects usage in redistributing union states

2020-06-09 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-18203:
--

 Summary: Reduce objects usage in redistributing union states
 Key: FLINK-18203
 URL: https://issues.apache.org/jira/browse/FLINK-18203
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.10.1
Reporter: Jiayi Liao


#{{RoundRobinOperatorStateRepartitioner}}#{{repartitionUnionState}} creates a 
new {{OperatorStreamStateHandle}} instance for every {{StreamStateHandle}} 
instance used in every execution, which causes the number of new 
{{OperatorStreamStateHandle}} instances up to m * n (jobvertex parallelism * 
count of all executions' StreamStateHandle). 

But in fact, all executions can share the same collection of 
{{StreamStateHandle}} because it's **union state**. 





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] New Flink Committer: Benchao Li

2020-06-09 Thread Jiayi Liao
Congratulations!


Best,
Jiayi Liao

On Tue, Jun 9, 2020 at 2:32 PM Dian Fu  wrote:

> Congrats Benchao!
>
> Regards,
> Dian
>
> > 在 2020年6月9日,下午2:30,Xintong Song  写道:
> >
> > Congratulations, Benchao~!
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Jun 9, 2020 at 2:16 PM Jingsong Li 
> wrote:
> >
> >> Congratulations, Benchao. Well deserved!
> >>
> >> Best,
> >> Jingsong Lee
> >>
> >> On Tue, Jun 9, 2020 at 2:13 PM Forward Xu 
> wrote:
> >>
> >>> Congratulations, Benchao
> >>>
> >>> Best,
> >>> Forward
> >>>
> >>> Jark Wu  于2020年6月9日周二 下午2:10写道:
> >>>
> >>>> Hi everyone,
> >>>>
> >>>> On behalf of the PMC, I'm very happy to announce Benchao Li as a new
> >>> Apache
> >>>> Flink committer.
> >>>>
> >>>> Benchao started contributing to Flink since late 2018. He is very
> >> active
> >>> in
> >>>> Flink SQL component,
> >>>> and has also participated in many discussions, bug fixes. Over the
> past
> >>> few
> >>>> months, he helped tremendously in answering user questions in the
> >> mailing
> >>>> list.
> >>>>
> >>>> Please join me in congratulating Benchao for becoming a Flink
> >> committer!
> >>>>
> >>>> Thanks,
> >>>> Jark
> >>>>
> >>>
> >>
> >>
> >> --
> >> Best, Jingsong Lee
> >>
>
>


Re: [ANNOUNCE] New Apache Flink Committer - Xintong Song

2020-06-04 Thread Jiayi Liao
Congratulations!

Best,
Jiayi Liao

On Fri, Jun 5, 2020 at 1:48 PM Biao Liu  wrote:

> Congrats!
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Fri, 5 Jun 2020 at 13:32, Thomas Weise  wrote:
>
> > Congratulations!
> >
> >
> > On Thu, Jun 4, 2020, 10:17 PM Yuan Mei  wrote:
> >
> > > Congrats, Xintong!
> > >
> > > On Fri, Jun 5, 2020 at 12:45 PM Becket Qin 
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > On behalf of the PMC, I’m very happy to announce Xintong Song as a
> new
> > > > Flink committer.
> > > >
> > > > Xintong started to contribute to Flink about two years ago and has
> been
> > > > active since. His major work is in Flink resource management, and
> have
> > > also
> > > > participated in discussions, bug fixes and answering questions.
> > > >
> > > > Please join me in congratulating Xintong for becoming a Flink
> > committer!
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > >
> >
>


[jira] [Created] (FLINK-18068) Job stuck after IllegalStateException in scheduling

2020-06-02 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-18068:
--

 Summary: Job stuck after IllegalStateException in scheduling
 Key: FLINK-18068
 URL: https://issues.apache.org/jira/browse/FLINK-18068
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Affects Versions: 1.10.1
Reporter: Jiayi Liao


The job will stop but still be alive with doing nothing forever if any non 
fatal exception is thrown from interacting with YARN. Here is the example :


{code:java}

java.lang.IllegalStateException: The RMClient's and YarnResourceManagers 
internal state about the number of pending container requests for resource 
 has diverged. Number client's pending container 
requests 40 != Number RM's pending container requests 0.
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:217) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
org.apache.flink.yarn.YarnResourceManager.getPendingRequestsAndCheckConsistency(YarnResourceManager.java:518)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
org.apache.flink.yarn.YarnResourceManager.onContainersOfResourceAllocated(YarnResourceManager.java:431)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:395)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-ljy-1.0.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-ljy-1.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-ljy-1.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-ljy-1.0.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18067) Invalid default value for yarnMinAllocationMB in YarnClusterDescriptor

2020-06-02 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-18067:
--

 Summary: Invalid default value for yarnMinAllocationMB in 
YarnClusterDescriptor
 Key: FLINK-18067
 URL: https://issues.apache.org/jira/browse/FLINK-18067
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Affects Versions: 1.10.1
Reporter: Jiayi Liao


Currently Flink sets {{yarnMinAllocationMB}} default value to 0, which will 
crash the job in normalizing the allocation memory. There should be two minor 
changes after discussion with [~fly_in_gis]:

* Make the default value to 
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, not 0
* Add pre-check for yarnMinAllocationMB, it should be greater than 0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17453) KyroSerializer throws IndexOutOfBoundsException type java.util.PriorityQueue

2020-04-28 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-17453:
--

 Summary: KyroSerializer throws IndexOutOfBoundsException type 
java.util.PriorityQueue
 Key: FLINK-17453
 URL: https://issues.apache.org/jira/browse/FLINK-17453
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.9.0
Reporter: Jiayi Liao



{code:java}
2020-04-28 22:28:18,659 INFO  
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer  - 
IndexOutOfBoundsException type java.util.PriorityQueue source data is: [2, 0, 
0, 0, 0, 0, 0, 0].
2020-04-28 22:28:18,660 INFO  org.apache.flink.runtime.taskmanager.Task 
- Attempting to fail task externally GroupWindowAggregate -> 
Calc_select_live_id__2 -> SinkConversionToTupl -> Map -> Filter (37/40)- 
execution # 0 (4636858426452f0a437d2f6d9564f34d).
2020-04-28 22:28:18,660 INFO  org.apache.flink.runtime.taskmanager.Task 
- GroupWindowAggregate -> Calc_select_live_id__2 -> 
SinkConversionToTupl -> Map -> Filter (37/40)- execution # 0 
(4636858426452f0a437d2f6d9564f34d) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught 
exception while processing timer.
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:967)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:941)
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: TimerException{com.esotericsoftware.kryo.KryoException: 
java.io.EOFException: No more bytes left.}
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
... 7 more
Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No 
more bytes left.
at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)
at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:361)
at 
org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
at 
org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
at 
org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
at 
org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
at 
org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
at 
org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
at 
org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
at 
org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
at GroupingWindowAggsHandler$57.setAccumulators(Unknown Source)
at 
org.apache.flink.table.runtime.operators.window.internal.GeneralWindowProcessFunction.getWindowAggregationResult(GeneralWindowProcessFunction.java:73)
at 
org.apache.flink.table.runtime.operators.window.WindowOperator.emitWindowResult(WindowOperator.java:434)
at 
org.apache.flink.table.runtime.operators.window.WindowOperator.onProcessingTime(WindowOperator.java:422)
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
... 7 more
Caused by: java.io.EOFException: No more bytes left.
... 26 m

[jira] [Created] (FLINK-17320) Java8 lambda expression cannot be serialized.

2020-04-22 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-17320:
--

 Summary: Java8 lambda expression cannot be serialized.
 Key: FLINK-17320
 URL: https://issues.apache.org/jira/browse/FLINK-17320
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System, Table SQL / Runtime
Affects Versions: 1.9.0
Reporter: Jiayi Liao


Reproduce codes.


{code:java}
@Test
public void test() throws IOException {
PriorityQueue pq = new PriorityQueue<>((o1, o2) -> o1.length - 
o2.length - 1);
pq.add("1234135");
pq.add("12323424135");

KryoSerializer kryoSerializer = new KryoSerializer(PriorityQueue.class, 
new ExecutionConfig());
kryoSerializer.serialize(pq, new DataOutputSerializer(10240));
}
{code}

And the NPE will be thrown:


{code:java}
Caused by: java.lang.NullPointerException
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:593)
at 
org.apache.flink.runtime.types.PriorityQueueSerializer.write(PriorityQueueSerializer.java:67)
at 
org.apache.flink.runtime.types.PriorityQueueSerializer.write(PriorityQueueSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:307)
at 
org.apache.flink.util.InstantiationUtil.serializeToByteArray(InstantiationUtil.java:526)
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17134) Wrong logging information in Kafka010PartitionDiscoverer#L80

2020-04-14 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-17134:
--

 Summary: Wrong logging information in 
Kafka010PartitionDiscoverer#L80
 Key: FLINK-17134
 URL: https://issues.apache.org/jira/browse/FLINK-17134
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.10.0
Reporter: Jiayi Liao


https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010PartitionDiscoverer.java#L80


{code:java}
throw new RuntimeException("Could not fetch partitions for %s. Make sure that 
the topic exists.".format(topic));
{code} 

equals to {{String.format(topic)}}, which

should be replaced with 

{code:java}
throw new RuntimeException(String.format("Could not fetch partitions for %s. 
Make sure that the topic exists.", topic));
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] New Flink committer: Seth Wiesman

2020-04-07 Thread Jiayi Liao
>
> Congratulations Seth :)
>
>


Re: [ANNOUNCE] New Committers and PMC member

2020-04-01 Thread Jiayi Liao
Congratulations to you all!

On Wed, Apr 1, 2020 at 5:05 PM Arvid Heise  wrote:

> Congratulations!
>
> On Wed, Apr 1, 2020 at 11:03 AM Dian Fu  wrote:
>
> > Congratulations to you all.
> >
> >
> > > 在 2020年4月1日,下午5:00,Robert Metzger  写道:
> > >
> > > Welcome & congratulations to all of you!
> > >
> > >
> > > On Wed, Apr 1, 2020 at 10:58 AM Jingsong Li 
> > wrote:
> > >
> > >> Congratulations! Konstantin, Dawid, Zhijiang. Well deserved.
> > >>
> > >> Best,
> > >> Jingsong Lee
> > >>
> > >> On Wed, Apr 1, 2020 at 4:52 PM Stephan Ewen  wrote:
> > >>
> > >>> Hi all!
> > >>>
> > >>> Happy to announce that over the last few weeks, several people in the
> > >>> community joined in new roles:
> > >>>
> > >>>  - Konstantin Knauf joined as a committer. You may know him, for
> > >> example,
> > >>> from the weekly community updates.
> > >>>
> > >>>  - Dawid Wysakowicz joined the PMC. Dawid is one of the main
> developers
> > >> on
> > >>> the Table API.
> > >>>
> > >>>  - Zhijiang Wang joined the PMC. Zhijiang is a veteran of Flink's
> > >> network
> > >>> / data shuffle system.
> > >>>
> > >>> A warm welcome to your new roles in the Flink project!
> > >>>
> > >>> Best,
> > >>> Stephan
> > >>>
> > >>
> > >>
> > >> --
> > >> Best, Jingsong Lee
> > >>
> >
> >
>


[jira] [Created] (FLINK-16755) Savepoint docs should be updated

2020-03-24 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-16755:
--

 Summary: Savepoint docs should be updated
 Key: FLINK-16755
 URL: https://issues.apache.org/jira/browse/FLINK-16755
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.10.0
Reporter: Jiayi Liao


We should update this 
[paragraph|https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#can-i-move-the-savepoint-files-on-stable-storage]
 after state processor api. 

cc [~sjwiesman]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16753) Exception from AsyncCheckpointRunnable should be wrapped in CheckpointException

2020-03-24 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-16753:
--

 Summary: Exception from AsyncCheckpointRunnable should be wrapped 
in CheckpointException
 Key: FLINK-16753
 URL: https://issues.apache.org/jira/browse/FLINK-16753
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.10.0
Reporter: Jiayi Liao


If an exception is thrown when task is doing aysnc checkpoint, the checkpoint 
will be declined and regarded as {{CheckpointFailureReason.JOB_FAILURE}}, which 
gives a wrong message to users.

I think we can simply replace
{code:java}
owner.getEnvironment().declineCheckpoint(checkpointMetaData.getCheckpointId(), 
checkpointException);
{code}

with
 
{code:java}
owner.getEnvironment().declineCheckpoint(checkpointMetaData.getCheckpointId(), 
new CheckpointException(CheckpointFailureReason.EXCEPTION, 
checkpointException));
{code}

cc [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16620) Add attempt information in logging

2020-03-16 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-16620:
--

 Summary: Add attempt information in logging
 Key: FLINK-16620
 URL: https://issues.apache.org/jira/browse/FLINK-16620
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Affects Versions: 1.10.0
Reporter: Jiayi Liao


Currently  logging in places such as {{Task}} and {{StreamTask}} , is using 
{{taskNameWithSubtasks}} in {{TaskInfo}} to represent an execution. I think 
it'll be more user-friendly if we can add attempt information into the logging.

The format can be consitent with logging information in {{Execution}} :

{code:java}
MySink (3/10) - execution #0
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16597) tryCleanupOnError should happen before close

2020-03-14 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-16597:
--

 Summary: tryCleanupOnError should happen before close
 Key: FLINK-16597
 URL: https://issues.apache.org/jira/browse/FLINK-16597
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Affects Versions: 1.10.0
Reporter: Jiayi Liao


If {{DataSinkTask}} fails or throws an exception, the {{OutputFormat}} will 
call {{tryCleanupOnError}} before {{close}}. But the calling order is reverse 
when {{DataSinkTask}} is cancelled, which doesn't make much sense that we can 
still clean the output format after it's closed.

I'm not very sure if this is a bug. But this does mislead our developers when 
implementing the {{OutputFormat}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16242) BinaryGeneric serialization error cause checkpoint failure

2020-02-23 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-16242:
--

 Summary: BinaryGeneric serialization error cause checkpoint failure
 Key: FLINK-16242
 URL: https://issues.apache.org/jira/browse/FLINK-16242
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner, Table SQL / Runtime
Affects Versions: 1.9.2
Reporter: Jiayi Liao
 Attachments: error_serialization

The serialization error occurs from time to time when we're using 
{{RoaringBitmap}} as the accumulator of a UDAF.

I've attached the screenshot of the error.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16051) subtask id in Checkpoint UI not consistent with TaskUI

2020-02-13 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-16051:
--

 Summary: subtask id in Checkpoint UI not consistent with TaskUI
 Key: FLINK-16051
 URL: https://issues.apache.org/jira/browse/FLINK-16051
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Affects Versions: 1.10.0
Reporter: Jiayi Liao
 Attachments: checkpointui.png, taskui.png

The subtask id in Subtask UI starts from 0, but in Checkpoint UI subtask id 
starts from 1.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16042) Add state benchmark for append operation in AppendingState

2020-02-13 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-16042:
--

 Summary: Add state benchmark for append operation in AppendingState
 Key: FLINK-16042
 URL: https://issues.apache.org/jira/browse/FLINK-16042
 Project: Flink
  Issue Type: Improvement
  Components: Benchmarks
Affects Versions: 1.10.0
Reporter: Jiayi Liao


As discussed [here|https://github.com/dataArtisans/flink-benchmarks/issues/47], 
currently we don't have benchmark for append operation in {{AppendingState}}, 
which is frequently used in stateful operators like {{WindowOperator}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16017) Improve attachJobGraph Performance

2020-02-12 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-16017:
--

 Summary: Improve attachJobGraph Performance
 Key: FLINK-16017
 URL: https://issues.apache.org/jira/browse/FLINK-16017
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: Jiayi Liao


Currently {{RegionPartitionReleaseStrategy}} and 
{{AdaptedRestartPipelinedRegionStrategyNG}} both need to compute distinct 
pipelined regions, which affects the performance due to the duplicate 
calculating.

The best idea that comes to my mind so far is calculating distinct pipelined 
regions in {{DefaultExecutionTopology}} so that we can remove the duplicate 
calculating.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16001) Avoid using Java Streams in construction of ExecutionGraph

2020-02-11 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-16001:
--

 Summary: Avoid using Java Streams in construction of ExecutionGraph
 Key: FLINK-16001
 URL: https://issues.apache.org/jira/browse/FLINK-16001
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: Jiayi Liao


I think we should avoid {{Java Streams}} in construction of {{ExecutionGraph}} 
like function {{toPipelinedRegionsSet}} in {{PipelinedRegionComputeUtil}} 
because the job submission is definitely performance sensitive, especially when 
{{distinctRegions}} has a large cardinality.

Also includes some other places in package 
{{org.apache.flink.runtime.executiongraph}}

cc [~trohrmann] [~zhuzh] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15923) Remove DISCARDED in TaskAcknowledgeResult

2020-02-05 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-15923:
--

 Summary: Remove DISCARDED in TaskAcknowledgeResult
 Key: FLINK-15923
 URL: https://issues.apache.org/jira/browse/FLINK-15923
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.9.2
Reporter: Jiayi Liao


{{TaskAcknowledgeResult.DISCARDED}} is returned only when the checkpoint is 
discarded and removed from the {{pendingCheckpoints}}. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14996) Missing java doc in org.apache.flink.runtime.operators.TempBarrier

2019-11-30 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-14996:
--

 Summary: Missing java doc in 
org.apache.flink.runtime.operators.TempBarrier
 Key: FLINK-14996
 URL: https://issues.apache.org/jira/browse/FLINK-14996
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Affects Versions: 1.9.1
Reporter: Jiayi Liao


Empty java doc in {{org.apache.flink.runtime.operators.TempBarrier}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14950) Support getKey in WindowOperator.Context

2019-11-26 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-14950:
--

 Summary: Support getKey in WindowOperator.Context
 Key: FLINK-14950
 URL: https://issues.apache.org/jira/browse/FLINK-14950
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.9.1
Reporter: Jiayi Liao


In our scenario, user needs to access the key of {{WindowOperator.Context}} to 
determine how to deal with the window.

I think it's reasonable to support {{getKey()}} method in 
{{WindowOperator.Context}}. 

cc [~aljoscha]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re:Cron style for checkpoint

2019-11-20 Thread Jiayi Liao
Hi Shuwen,




As far as I know, Flink can only support checkpoint with a fixed interval. 




However I think the flexible mechanism of triggering checkpoint is worth 
working on, at least from my perspective. And it may not only be a cron style. 
In our business scenario, the data traffic usually reaches the peek of the day 
after 20:00, which we want to increase the interval of checkpoint otherwise 
it’ll introduce more disk and network IO.




Just want to share something about this :)







Best,

Jiayi Liao




At 2019-11-21 10:20:47, "shuwen zhou"  wrote:
>Hi Community,
>I would like to know if there is a existing function to support cron style
>checkpoint?
>The case is, our data traffic is huge on HH:30 for each hour. We don't wont
>checkpoint to fall in that range of time. A cron like 15,45 * * * * to set
>for checkpoint would be nice. If a checkpoint is already in progress when
>minutes is 15 or 45, there would be a config value to trigger a new
>checkpoint or pass.
>
>-- 
>Best Wishes,
>Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>


[jira] [Created] (FLINK-14833) Remove unnecessary SnapshotStrategySynchronicityBehavior from HeapSnapshotStrategy

2019-11-17 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-14833:
--

 Summary: Remove unnecessary SnapshotStrategySynchronicityBehavior 
from HeapSnapshotStrategy
 Key: FLINK-14833
 URL: https://issues.apache.org/jira/browse/FLINK-14833
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.9.1
Reporter: Jiayi Liao


Since all methods implementing from {{SnapshotStrategySynchronicityBehavior}} 
in {{HeapSnapshotStrategy}} are executing as the same pattern below: 
{code:java}
@Override
public void finalizeSnapshotBeforeReturnHook(Runnable runnable) {
   
snapshotStrategySynchronicityTrait.finalizeSnapshotBeforeReturnHook(runnable);
}

@Override
public boolean isAsynchronous() {
   return snapshotStrategySynchronicityTrait.isAsynchronous();
}

@Override
public  StateTable newStateTable(
   InternalKeyContext keyContext,
   RegisteredKeyValueStateBackendMetaInfo newMetaInfo,
   TypeSerializer keySerializer) {
   return snapshotStrategySynchronicityTrait.newStateTable(keyContext, 
newMetaInfo, keySerializer);
}
{code}

It looks like {{SnapshotStrategySynchronicityBehavior}} is not necessary for 
{{HeapSnapshotStrategy}} and we can just remove it and the related 
{{@Override}} annotations. And {{HeapSnapshotStrategy}} doesn't match the java 
docs in {{SnapshotStrategySynchronicityBehavior}} also.


 cc [~liyu]

And please correct me if there is a reason here.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14766) Remove volatile variable in ExecutionVertex

2019-11-13 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-14766:
--

 Summary: Remove volatile variable in ExecutionVertex
 Key: FLINK-14766
 URL: https://issues.apache.org/jira/browse/FLINK-14766
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.9.0
Reporter: Jiayi Liao


Since operations have already been single-thread in {{ExecutionVertex}} I think 
we can remove the volatile decorator on {{currentExecution}} and 
{{locationConstraint}}. 

 

And same for {{Execution}} too.

 

cc [~zhuzh]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14744) Neighboring vertex should be chained if ship strategy is FORWARD or NONE

2019-11-13 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-14744:
--

 Summary: Neighboring vertex should be chained if ship strategy is 
FORWARD or NONE
 Key: FLINK-14744
 URL: https://issues.apache.org/jira/browse/FLINK-14744
 Project: Flink
  Issue Type: Improvement
  Components: Client / Job Submission
Affects Versions: 1.9.0
Reporter: Jiayi Liao


Assuming that I have a job and the unchained graph is like :
{code:java}
Data Source(parallelism=5)  ->  Map (parallelism=5)  ->  DataSink 
(parallelism=5){code}
The \{{JobGraphGenerator}} will chained \{{Data Source}} and \{{Map}}, and use 
\{{FORWARD}} to connect the chained task and \{{DataSink}} task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14736) Fail to connect Hive metastore

2019-11-13 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-14736:
--

 Summary: Fail to connect Hive metastore
 Key: FLINK-14736
 URL: https://issues.apache.org/jira/browse/FLINK-14736
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.9.0
Reporter: Jiayi Liao
 Attachments: hivelog

I created a very simple flink program using hive connector. According to logs, 
it seems that the program failed to connect hive metastore but it's okay with 
Spark. 

Codes:
{code:java}
val fbEnv = ExecutionEnvironment.getExecutionEnvironment
fbEnv.setParallelism(10);
val fbTableEnv = BatchTableEnvironment.create(fbEnv)

val hiveCatalog = new HiveCatalog("hive", null, 
"/data00/tiger/spark_deploy/spark-stable/conf", "1.2.1")
fbTableEnv.registerCatalog("hive", hiveCatalog)

fbTableEnv.useCatalog("hive")
val testTable = fbTableEnv.sqlQuery("select count(1) from 
ies_antispam_test.big_graph_white_seeds_20191101_lxb")

val dataSet = fbTableEnv.toDataSet[Row](testTable)

dataSet.print()

fbTableEnv.execute("Hive Query");
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14646) Check null for key in KeyGroupStreamPartitioner

2019-11-06 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-14646:
--

 Summary: Check null for key in KeyGroupStreamPartitioner
 Key: FLINK-14646
 URL: https://issues.apache.org/jira/browse/FLINK-14646
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.9.0
Reporter: Jiayi Liao


We should check null for {{key}} in {{selectChannel}} function in 
{{KeyGroupStreamPartitioner}} because {{KeyGroupRangeAssignment}} does not 
allow null key for assignment. And if we produce a null key in 
{{KeyGroupStreamPartitioner}}, the exception is not clear enough.


{code:java}
Caused by: java.lang.NullPointerException
at 
org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:60)
at 
org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:49)
at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:58)
at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:32)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
... 9 more
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14519) Fail on invoking declineCheckpoint remotely

2019-10-24 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-14519:
--

 Summary: Fail on invoking declineCheckpoint remotely
 Key: FLINK-14519
 URL: https://issues.apache.org/jira/browse/FLINK-14519
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.9.1
Reporter: Jiayi Liao


On invoking declineCheckpoint, the reason field of DeclineCheckpoint is not 
serializable when  it is a RocksDBException because org.rocksdb.Status doesn't 
implement serializable.

Execption:


{panel:title=My title}
Caused by: java.io.IOException: Could not serialize 0th argument of method 
declineCheckpoint. This indicates that the argument type [Ljava.lang.Object; is 
not serializable. Arguments have to be serializable for remote rpc calls.
at 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation$MethodInvocation.writeObject(RemoteRpcInvocation.java:186)
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
at 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.(RemoteRpcInvocation.java:53)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:264)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78)
... 18 more
Caused by: java.io.NotSerializableException: org.rocksdb.Status
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at java.lang.Throwable.writeObject(Throwable.java:985)
at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation$MethodInvocation.writeObject(RemoteRpcInvocation.java:182)
... 33 more
{panel}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14463) Merge Handover in flink-connector-kafka and flink-connector-kafka-0.9

2019-10-19 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-14463:
--

 Summary: Merge Handover in flink-connector-kafka and 
flink-connector-kafka-0.9
 Key: FLINK-14463
 URL: https://issues.apache.org/jira/browse/FLINK-14463
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.9.0
Reporter: Jiayi Liao


Handover.java exists both in flink-connector-kafka(kafka 2.x) module and 
flink-connector-kafka-0.9 module. We should put this file into kafka base 
module to avoid repeated codes.

cc [~sewen] [~yanghua]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14384) ValidationMatcher in FlinkSqlParserImplTest fails to test negative tests

2019-10-13 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-14384:
--

 Summary: ValidationMatcher in FlinkSqlParserImplTest fails to test 
negative tests
 Key: FLINK-14384
 URL: https://issues.apache.org/jira/browse/FLINK-14384
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.0
Reporter: Jiayi Liao


In ValidationMatcher we use failMsg to test exception. However the sql can be 
validated successfully without throwing a exception and #matches method returns 
true finally.

 

Take ```testInsertWithInvalidPartitionColumns``` as an example, you can succeed 
this tests even though you create a wrong failMsg.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-13854) Support Aggregating in Join and CoGroup

2019-08-26 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-13854:
--

 Summary: Support Aggregating in Join and CoGroup
 Key: FLINK-13854
 URL: https://issues.apache.org/jira/browse/FLINK-13854
 Project: Flink
  Issue Type: New Feature
Affects Versions: 1.9.0
Reporter: Jiayi Liao


In WindowStream we can use  windowStream.aggregate(AggregateFunction, 
WindowFunction) to aggregate input records in real-time.   

I think we should support similar api in JoinedStreams and CoGroupStreams, 
because it's a very huge cost by storing the records log in state backend, 
especially when we don't need the specific records.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-11019) Expose the queryable proxy server port in taskinfo api

2018-11-28 Thread Jiayi Liao (JIRA)
Jiayi Liao created FLINK-11019:
--

 Summary: Expose the queryable proxy server port in taskinfo api
 Key: FLINK-11019
 URL: https://issues.apache.org/jira/browse/FLINK-11019
 Project: Flink
  Issue Type: New Feature
Reporter: Jiayi Liao
Assignee: Jiayi Liao


Now we can't get the queryable proxy server port outside a flink job. It'll be 
hard to get if we manually set the port to be in a range.

We should add this in the taskinfo api.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10807) KafkaConsumer still consume removed topic after changing topics list

2018-11-06 Thread Jiayi Liao (JIRA)
Jiayi Liao created FLINK-10807:
--

 Summary: KafkaConsumer still consume removed topic after changing 
topics list
 Key: FLINK-10807
 URL: https://issues.apache.org/jira/browse/FLINK-10807
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.6.2
Reporter: Jiayi Liao
Assignee: Jiayi Liao


subscribedPartitionsToStartOffsets in KafkaConsumerBase is getting values from 
restoredState, which is initialized in initializeState and discovering 
partitions. However, if we remove a topic in topics list and restore the Flink 
program, the restoredState still keeps removed topic, and the fetcher will 
still fetch the data of the topic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10806) Support consume from earliest offset with a new topic

2018-11-06 Thread Jiayi Liao (JIRA)
Jiayi Liao created FLINK-10806:
--

 Summary: Support consume from earliest offset with a new topic
 Key: FLINK-10806
 URL: https://issues.apache.org/jira/browse/FLINK-10806
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Affects Versions: 1.6.2
Reporter: Jiayi Liao
Assignee: Jiayi Liao


In KafkaConsumerBase, we discover the TopicPartitions and compare them with the 
restoredState. It's reasonable when a topic's partitions scaled. However, if we 
add a new topic which has too much data and restore the Flink program, the data 
of the new topic will be consumed from the start, which may not be what we 
want.  I think this should be an option for developers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10654) AllowNonRestoredState should skip state that cannot be restored

2018-10-23 Thread Jiayi Liao (JIRA)
Jiayi Liao created FLINK-10654:
--

 Summary: AllowNonRestoredState should skip state that cannot be 
restored
 Key: FLINK-10654
 URL: https://issues.apache.org/jira/browse/FLINK-10654
 Project: Flink
  Issue Type: Wish
Reporter: Jiayi Liao
Assignee: Jiayi Liao


We have a -n/--allowNonRestoredState option when we submit a job from command 
line. I know that it's used to ignore new operators' state, but I think it will 
be better if we can support that it can ignore those operator's state which 
throws exceptions during restore.

In this way, we can still recover most states in other operators after we 
modify only a few source codes that are related to states.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10641) Slow when applying new containers

2018-10-22 Thread Jiayi Liao (JIRA)
Jiayi Liao created FLINK-10641:
--

 Summary: Slow when applying new containers
 Key: FLINK-10641
 URL: https://issues.apache.org/jira/browse/FLINK-10641
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.6.1
Reporter: Jiayi Liao
Assignee: Jiayi Liao


When requesting containers from yarn, the containers are received and returned 
over and over again like this:
14:36:19,486 INFO org.apache.flink.yarn.YarnResourceManager - Received new 
container: container_1535124617388_1936_01_000929 - Remaining pending container 
requests: 0
14:36:19,486 INFO org.apache.flink.yarn.YarnResourceManager - Returning excess 
container container_1535124617388_1936_01_000929.

Sometimes it will last several minutes, which is out of our expectations. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10578) Support writable state in QueryableStateClient

2018-10-16 Thread Jiayi Liao (JIRA)
Jiayi Liao created FLINK-10578:
--

 Summary: Support writable state in QueryableStateClient
 Key: FLINK-10578
 URL: https://issues.apache.org/jira/browse/FLINK-10578
 Project: Flink
  Issue Type: New Feature
Reporter: Jiayi Liao
Assignee: Jiayi Liao






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10539) Support specific namespace in QueryableStateClient

2018-10-12 Thread Jiayi Liao (JIRA)
Jiayi Liao created FLINK-10539:
--

 Summary: Support specific namespace in QueryableStateClient
 Key: FLINK-10539
 URL: https://issues.apache.org/jira/browse/FLINK-10539
 Project: Flink
  Issue Type: New Feature
Affects Versions: 1.6.0
Reporter: Jiayi Liao
Assignee: Jiayi Liao


The only method of getting KvState we can access is getKvState, which directly 
uses VoidNamespace to query the state in stateTable, I think we should add a 
new method to let users able to specify namespace.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10350) Add assignTimestampsAndWatermarks in KeyedStream

2018-09-15 Thread Jiayi Liao (JIRA)
Jiayi Liao created FLINK-10350:
--

 Summary: Add assignTimestampsAndWatermarks in KeyedStream
 Key: FLINK-10350
 URL: https://issues.apache.org/jira/browse/FLINK-10350
 Project: Flink
  Issue Type: New Feature
Reporter: Jiayi Liao






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10348) Solve data skew when consuming data from kafka

2018-09-14 Thread Jiayi Liao (JIRA)
Jiayi Liao created FLINK-10348:
--

 Summary: Solve data skew when consuming data from kafka
 Key: FLINK-10348
 URL: https://issues.apache.org/jira/browse/FLINK-10348
 Project: Flink
  Issue Type: New Feature
  Components: Kafka Connector
Affects Versions: 1.6.0
Reporter: Jiayi Liao
Assignee: Jiayi Liao


By using KafkaConsumer, our strategy is to send fetch request to brokers with a 
fixed fetch size. Assume x topic has n partition and there exists data skew 
between partitions, now we need to consume data from x topic with earliest 
offset, and we can get max fetch size data in every fetch request. The problem 
is that when an task consumes data from both "big" partitions and "small" 
partitions, the data in "big" partitions may be late elements because "small" 
partitions are consumed faster.

*Solution: *
I think we can leverage two parameters to control this.
1. data.skew.check // whether to check data skew
2. data.skew.check.interval // the interval between checks
Every data.skew.check.interval, we will check the latest offset of every 
specific partition, and calculate (latest offset - current offset), then get 
partitions which need to slow down and redefine their fetch size.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10306) Support the display of log file from any position on webUI

2018-09-08 Thread Jiayi Liao (JIRA)
Jiayi Liao created FLINK-10306:
--

 Summary: Support the display of log file from any position on webUI
 Key: FLINK-10306
 URL: https://issues.apache.org/jira/browse/FLINK-10306
 Project: Flink
  Issue Type: Improvement
Reporter: Jiayi Liao
Assignee: Jiayi Liao


Although we copy the whole log files from taskmanager to blob service host, 
sometimes we may not be able to read the whole file's content on WebUI because 
of the browser's load.
We already use RandomAccessFile to read files, so I think we need to support 
read the log file from any row of it by adding a parameter at the end of url 
like http://xxx/#/taskmanager/container_xx/log/100.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10284) TumblingEventTimeWindows's offset should can be less than zero.

2018-09-05 Thread Jiayi Liao (JIRA)
Jiayi Liao created FLINK-10284:
--

 Summary: TumblingEventTimeWindows's offset should can be less than 
zero.
 Key: FLINK-10284
 URL: https://issues.apache.org/jira/browse/FLINK-10284
 Project: Flink
  Issue Type: Bug
Reporter: Jiayi Liao


My goal is to create a window from 0am to the next day's 0am within GMT+8 
timezone, so I choose to use TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.DAYS)), which uses UTC timezone to implement the range of "day". And 
it doesn't allow me to write an offset -8 in the TumblingEventTimeWindows, 
which can help me fix the offset of timezone.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)