回复: 关于窗口计算,数据不连续,导致窗口延迟触发的问题。

2021-11-03 Thread wang edmond
你好:

可以设置水位线的生成的空闲时间,超过空闲时间没有事件也会生成水位线。

可以参见官方文档中的 Dealing With Idle Sourcces部分

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/event-time/generating_watermarks/


Generating Watermarks | Apache 
Flink
Generating Watermarks # In this section you will learn about the APIs that 
Flink provides for working with event time timestamps and watermarks. For an 
introduction to event time, processing time, and ingestion time, please refer 
to the introduction to event time. Introduction to Watermark Strategies # In 
order to work with event time, Flink needs to know the events timestamps, 
meaning each ...
nightlies.apache.org



发件人: Gen Luo 
发送时间: 2021年11月3日 15:14
收件人: user-zh@flink.apache.org 
主题: Re: 关于窗口计算,数据不连续,导致窗口延迟触发的问题。

WatermarkGenerator接口有onEvent和onPeriodicEmit, onPeriodicEmit
会周期性调用,可能可以在这里实现一个多长时间没有调onEvent就发一个计算出来的新的watermark的逻辑,新的watermark比当前的watermark对应的窗口时间都更晚应该就能触发所有窗口了

On Mon, Nov 1, 2021 at 5:20 PM yuankuo.xia 
wrote:

> hi
>
>
> 背景:我在使用eventTime窗口进行聚合计算,但是数据不连续,比如:A,B时间段之内都有数据流入,但A时间段和B时间段中间有30分钟无数据流入
>
>
> 问题:由于数据不连续,导致A时间段的最后一个窗口不会触发,一直等到新数据流入才能触发。
>
>
> 是否有方案解决以上问题,比如:一段时间无数据流入,则触发所有窗口。我看了trigger接口,但是没有想到好的实现方案。


Re: flinkSQL写hive表,timestamp-pattern设置,分区是yyyyMMdd而不是yyyy-MM-dd的情况怎么搞。

2021-11-03 Thread Jingsong Li
你可以自定义个partition.time-extractor.class来自己解析

Flink应该搞个对应的partition.time-extractor.kind来默认支持你的需求。
建了个JIRA: https://issues.apache.org/jira/browse/FLINK-24758

Best,
Jingsong

On Thu, Nov 4, 2021 at 11:47 AM yidan zhao  wrote:
>
> 如题,我当前是select date_format(xxx, 'MMdd') as dt...
>
> partition.time-extractor.timestamp-pattern是$dt $hour:00:00这样。
>
> 但是这样会导致报错,貌似这个地方必须是 -MM-dd hh:mm:ss这种吗。



-- 
Best, Jingsong Lee


Re: Custom partitioning of keys with keyBy

2021-11-03 Thread naitong Xiao
I think I had a similar scenario several months ago, here is my related code:

val MAX_PARALLELISM = 16
val KEY_RAND_SALT = “73b46”

logSource.keyBy{ value =>
 val keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(value.deviceUdid, 
MAX_PARALLELISM)
 s"$KEY_RAND_SALT$keyGroup"
}

The keyGroup is just like your bucket id,  and the KEY_RAND_SALT was generated 
by some script to map bucket id evenly to operators under the max parallelism.

Sent with a Spark
On Nov 3, 2021, 9:47 PM +0800, Yuval Itzchakov , wrote:
> Hi,
> I have a use-case where I'd like to partition a KeyedDataStream a bit 
> differently than how Flinks default partitioning works with key groups.
>
> 
> What I'd like to be able to do is take all my data and split it up evenly 
> between 3 buckets which will store the data in the state. Using the key above 
> works, but splits the data unevenly between the different key groups, as 
> usually the key space is very small (0 - 3). What ends up happening is that 
> sometimes 50% of the keys end up on the same operator index, where ideally 
> I'd like to distribute it evenly between all operator indexes in the cluster.
>
> Is there any way of doing this?
> --
> Best Regards,
> Yuval Itzchakov.


flinkSQL写hive表,timestamp-pattern设置,分区是yyyyMMdd而不是yyyy-MM-dd的情况怎么搞。

2021-11-03 Thread yidan zhao
如题,我当前是select date_format(xxx, 'MMdd') as dt...

partition.time-extractor.timestamp-pattern是$dt $hour:00:00这样。

但是这样会导致报错,貌似这个地方必须是 -MM-dd hh:mm:ss这种吗。


Re: Statefun remote functions - acessing kafka headers from a remote function

2021-11-03 Thread Igal Shilman
Hi Fil,

The default Kafka ingress that ships with StateFun indeed doesn't bundle
the headers with the incoming message, so there is no way of getting them
at the moment, without doing some work :(
I'd be also happy to kick off the discussion (I guess JIRA would be the
right place) about supporting this feature.

The way I'd approach that at the moment is by adding a custom ingress via
the embedded sdk, unfortunately this might be a little involved
but I had this example project [1] for an older version of StateFun that
you can use as a starting point, if you are interested I can point you to
the changes required for it to work with the 3.x branch in subsequent
emails.

The most relevant part for you would be here [3][4]

The resulting artifact can be included within your StateFun application and
will make this new ingress available at the runtime [2]

[1] https://github.com/igalshilman/custom-ingress/
[2]
https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/embedded/
[3]
https://github.com/igalshilman/custom-ingress/blob/main/src/main/java/com/igal/Module.java#L76
[4]
https://github.com/igalshilman/custom-ingress/blob/main/src/main/protobuf/messages.proto#L26

I hope this helps,
Igal.


On Tue, Nov 2, 2021 at 2:04 PM Filip Karnicki 
wrote:

> Hi, is there a neat way to access kafka headers from within a remote
> function without using the datastream api to insert the headers as part of
> a RoutableMessage payload?
>
> Many thanks
> Fil
>


How to tune memory settings for batch job using sort-merge?

2021-11-03 Thread Joern Kottmann
Hello!

I often use batch mode to validate that my pipeline can produce the
expected results over some fixed input data, that usually works very well
and definitely helps to find bugs in my user code.

I have one job that reads many TBs of data from S3 and then writes reduced
outputs back to S3.

This job (p=800, on EMR/YARN, m5.xlarge with EBS) used to run with the hash
blocking shuffle but then I started to see "too many open files" exceptions
while scaling up the input data and switched to sort-merge.

For sort-merge there are various parameters that need to be adjusted and
the excellent blog post about it [1] and the flink memory documentation
certainly helped to understand this to some degree.

My issue is I don't manage to find memory settings that wouldn't result in
crashes.

If it runs with default settings, except for enabling sort-merge and
taskmanager.network.blocking-shuffle.compression.enabled: true, then it
would often crash with TimeoutExceptions in
SortMergeResultPartitionReadScheduler.allocateBuffers (line 168). The
exception recommends increasing
taskmanager.memory.framework.off-heap.batch-shuffle.size.

With an increased batch-shuffle.size: 128m and off-heap.size: 256m I would
still get the same exception.
Increasing this further would usually result in yarn killing containers
with the 137 exit code.

Increasing sort-shuffle.min-buffers e.g. to 2048 would have the same
result, container is killed with 137. Also after adding more memory to the
network.

To circumvent this I tried to increase the jvm-overhead but this also
didn't help.

My operators don't need a lot of memory (no off-heap memory) and are
probably not causing this, e.g. one where this often fails is a reduce
which only keeps one object at a time.

The crashes usually happen after the job is almost finished.

What is a good approach to debug this?

Thanks,
Jörn

[1] https://flink.apache.org/2021/10/26/sort-shuffle-part1.html


Re: Flink sink data to DB and then commit data to Kafka

2021-11-03 Thread Ali Bahadir Zeybek
Hello Qihua,

This will require you to implement and maintain your own database insertion
logic using any of the clients that your database and programming language
supports. Bear in mind that you will be losing all the optimizations
Flink's connector
provides for you and this will add complexity to the amount of the code
you will have to maintain. On the other hand it will handle the case within
one job.

If you have more control on the things you can do with your database, and
the
latency to kafka is not a major issue since there will be more moving
parts, then
what @Francesco Guardiani  suggested is also a
good approach. You will need
to maintain more systems, i.e. Debezium, but less custom code.

Therefore, it is mostly up to your requirements and available resources you
have
on how to proceed.

Sincerely,

Ali Bahadir Zeybek





On Wed, Nov 3, 2021 at 10:13 PM Qihua Yang  wrote:

> Many thanks guys!
> Hi Ali, for approach 2, what is the better way to do the database inserts
> for this case? Currently we simply use JDBC SQL connector to sink to
> database.
>
> Thanks,
> Qihua
>
> On Wed, Nov 3, 2021 at 8:13 AM Ali Bahadir Zeybek 
> wrote:
>
>> Hello Qihua,
>>
>> If you do not care with the events that are not committed to DB,
>> you can use Async I/O [1] and implement a logic that
>>
>>- does the database inserts
>>- completes the original events that are only accepted by DB
>>
>> You can then sink this new datastream to kafka.
>>
>> If you are also interested in the events that are not committed to DB,
>> you can use a Process Function [2] and implement a logic that
>>
>>- does the database inserts
>>- collects the original events that are only accepted by DB
>>- sends the ones that are not accepted by DB to a side output
>>
>> You can then sink this new datastream to kafka and maybe sideoutput to
>> another topic.
>>
>> Sincerely,
>>
>> Ali Bahadir Zeybek
>>
>> [1]:
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio
>> [2]:
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function
>>
>> On Wed, Nov 3, 2021 at 3:16 PM Francesco Guardiani <
>> france...@ververica.com> wrote:
>>
>>> An alternative is to use a CDC tool like Debezium to stream your table
>>> changes, and then ingest that stream using Flink to push data later to
>>> Kafka.
>>>
>>> On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma  wrote:
>>>
 Hi, Qihua

 AFAIK there is no way to do it. Maybe you need to implement a "new"
 sink to archive this target.

 Best,
 Guowei


 On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang  wrote:

> Hi,
>
> Our flink application has two sinks(DB and kafka topic). We want to
> push same data to both sinks. Is it possible to push data to kafka topic
> only after data is pushed to DB successfully? If the commit to DB fail, we
> don't want those data is pushed to kafka.
>
> Thanks,
> Qihua
>



Re: Flink sink data to DB and then commit data to Kafka

2021-11-03 Thread Qihua Yang
Many thanks guys!
Hi Ali, for approach 2, what is the better way to do the database inserts
for this case? Currently we simply use JDBC SQL connector to sink to
database.

Thanks,
Qihua

On Wed, Nov 3, 2021 at 8:13 AM Ali Bahadir Zeybek  wrote:

> Hello Qihua,
>
> If you do not care with the events that are not committed to DB,
> you can use Async I/O [1] and implement a logic that
>
>- does the database inserts
>- completes the original events that are only accepted by DB
>
> You can then sink this new datastream to kafka.
>
> If you are also interested in the events that are not committed to DB,
> you can use a Process Function [2] and implement a logic that
>
>- does the database inserts
>- collects the original events that are only accepted by DB
>- sends the ones that are not accepted by DB to a side output
>
> You can then sink this new datastream to kafka and maybe sideoutput to
> another topic.
>
> Sincerely,
>
> Ali Bahadir Zeybek
>
> [1]:
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio
> [2]:
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function
>
> On Wed, Nov 3, 2021 at 3:16 PM Francesco Guardiani <
> france...@ververica.com> wrote:
>
>> An alternative is to use a CDC tool like Debezium to stream your table
>> changes, and then ingest that stream using Flink to push data later to
>> Kafka.
>>
>> On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma  wrote:
>>
>>> Hi, Qihua
>>>
>>> AFAIK there is no way to do it. Maybe you need to implement a "new" sink
>>> to archive this target.
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang  wrote:
>>>
 Hi,

 Our flink application has two sinks(DB and kafka topic). We want to
 push same data to both sinks. Is it possible to push data to kafka topic
 only after data is pushed to DB successfully? If the commit to DB fail, we
 don't want those data is pushed to kafka.

 Thanks,
 Qihua

>>>


Re: Custom partitioning of keys with keyBy

2021-11-03 Thread David Anderson
Another possibility, if you know in advance the values of the keys, is to
find a mapping that transforms the original keys into new keys that will,
in fact, end up in disjoint key groups that will, in turn, be assigned to
different slots (given a specific parallelism). This is ugly, but feasible.

For reference, the key group for a given key is

MathUtils.murmurHash(key.hashCode()) % maxParallelism

and a given key group will be assigned to the slot computed by

keyGroup * actualParallelism / maxParallelism

David



On Wed, Nov 3, 2021 at 3:35 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Yuval,
>
>
>
> Just a couple of comments:
>
>
>
>- Assuming that all your 4 different keys are evenly distributed, and
>you send them to (only) 3 buckets, you would expect at least one bucket to
>cover 2 of your keys, hence the 50%
>- With low entropy keys avoiding data skew is quite difficult
>- But your situation could be worse, all 4 keys could end up in the
>same bucket, if the hash function in use happens to generate collisions for
>the 4 keys, in which case 2 of your 3 buckets would not process any events
>… this could also lead to watermarks not progressing …
>- There is two proposal on how to improve the situation:
>   - Use the same parallelism and max parallelism for the relevant
>   operators and implement a manual partitioner
>  - A manual partitioner is also good in situations where you want
>  to lower the bias and you exactly know the distribution of your key 
> space
>  and rearrange keys to even-out numbers
>   - More sophisticated (if possible), divide-and-conquer like:
>  - Key by your ‘small’ key plus soma arbitrary attribute with
>  higher entropy
>  - Window aggregate first on that artificial key
>  - Aggregate the results on your original ‘small’ key
>  - This could be interesting for high-throughput situation where
>  you actually want to run in parallelism higher than the number of 
> different
>  ‘small’ keys
>
>
>
> Hope this helps
>
>
>
> Thias
>
>
>
>
>
> *From:* Yuval Itzchakov 
> *Sent:* Mittwoch, 3. November 2021 14:41
> *To:* user 
> *Subject:* Custom partitioning of keys with keyBy
>
>
>
> Hi,
>
> I have a use-case where I'd like to partition a KeyedDataStream a bit
> differently than how Flinks default partitioning works with key groups.
>
>
>
> What I'd like to be able to do is take all my data and split it up evenly
> between 3 buckets which will store the data in the state. Using the key
> above works, but splits the data unevenly between the different key groups,
> as usually the key space is very small (0 - 3). What ends up happening is
> that sometimes 50% of the keys end up on the same operator index, where
> ideally I'd like to distribute it evenly between all operator indexes in
> the cluster.
>
>
>
> Is there any way of doing this?
>
> --
>
> Best Regards,
> Yuval Itzchakov.
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


Re: What is Could not retrieve file from transient blob store?

2021-11-03 Thread John Smith
Ok I missed the log below. I guess when the task manager was stopped this
happened.

I attached the full sequence. But I guess it's ok and not a big issue???


2021-11-02 23:20:22,682 ERROR
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
- Failed to transfer file from TaskExecutor 7e1
b7db5918004e4160fdecec1bbdad7.
java.util.concurrent.CompletionException: org.apache.flink.util.
FlinkException: Could not retrieve file from transient blob store.
at org.apache.flink.runtime.rest.handler.taskmanager.
AbstractTaskManagerFileHandler.lambda$respondToRequest$0(
AbstractTaskManagerFileHandler.java:135)
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
.java:670)
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
CompletableFuture.java:646)
at java.util.concurrent.CompletableFuture$Completion.run(
CompletableFuture.java:456)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(
NioEventLoop.java:515)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at org.apache.flink.shaded.netty4.io.netty.util.internal.
ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not retrieve file
from transient blob store.
... 10 more
Caused by: java.io.FileNotFoundException: Local file /tmp/blobStore-9
cb73f27-11db-4c42-a3fc-9b77f558e722/no_job/blob_t-274d3
c2d5acd78ced877d898b1877b10b62a64df-590b54325d599a6782a77413691e0a7b does
not exist and failed to copy from blob store.
at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer
.java:516)
at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer
.java:444)
at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:369)
at org.apache.flink.runtime.rest.handler.taskmanager.
AbstractTaskManagerFileHandler.lambda$respondToRequest$0(
AbstractTaskManagerFileHandler.java:133)
... 9 more
2021-11-02 23:20:22,703 ERROR
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
- Unhandled exception.
org.apache.flink.util.FlinkException: Could not retrieve file from transient
blob store.
at org.apache.flink.runtime.rest.handler.taskmanager.
AbstractTaskManagerFileHandler.lambda$respondToRequest$0(
AbstractTaskManagerFileHandler.java:135)
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
.java:670)
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
CompletableFuture.java:646)
at java.util.concurrent.CompletableFuture$Completion.run(
CompletableFuture.java:456)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(
NioEventLoop.java:515)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at org.apache.flink.shaded.netty4.io.netty.util.internal.
ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: Local file /tmp/blobStore-9
cb73f27-11db-4c42-a3fc-9b77f558e722/no_job/blob_t-274d3
c2d5acd78ced877d898b1877b10b62a64df-590b54325d599a6782a77413691e0a7b does
not exist and failed to copy from blob store.
at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer
.java:516)
at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer
.java:444)
at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:369)
at org.apache.flink.runtime.rest.handler.taskmanager.
AbstractTaskManagerFileHandler.lambda$respondToRequest$0(
AbstractTaskManagerFileHandler.java:133)
... 9 more

On Wed, 3 Nov 2021 at 02:48, Guowei Ma  wrote:

> Hi, Smith
>
> It seems that the log file(blob_t-274d3c2d5acd78ced877d89
> 8b1877b10b62a64df-590b54325d599a6782a77413691e0a7b) is deleted for some
> reason. But AFAIK there are no other guys reporting this exception.(Maybe
> other guys know what would happen).
> 1. I think if you could refresh the page and you would see the correct
> result because this would trigger another file retrieving from TM.
> 2. And It might be more safe that setting an dedicated blob
> directory path(other than /tmp) `blob.storage.directory`[1]
>
> [1]
> 

GenericWriteAheadSink, declined checkpoint for a finished source

2021-11-03 Thread James Sandys-Lumsdaine
Hello,

I have a Flink workflow where I need to upload the output data into a legacy 
SQL Server database and so I have read the section in the Flink book about data 
sinks and utilizing the GenericWriteAheadSink base class. I am currently using 
Flink 1.12.3 although we plan to upgrade to 1.14 shortly.

Firstly, given I will be generating a large amount of data I feel it best to 
use the GenericWriteAheadSink base class so I can bulk copy all the data into 
my SQL Server database rather than attempt a row by row insertion which would 
be too slow. Hopefully this is a good use case for this class or is there now a 
better approach?

Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the 
program actually exists before a final checkpoint is taken so I miss many of 
the final rows - I have to put in a Thread.sleep(5000) before allowing the JDBC 
source to exit. This might be related to FLINK-21215 as I see the following 
error:
org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC Source 
(1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)
With the extra Thread.sleep(5000) I see all the rows handled by the 
sendValues() method.

I have included the test code below which just logs the "insertions" for now 
(and doesn't do real db access) but demonstrates the problem:

private void checkpointTest() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
env.enableCheckpointing(500);

MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams, 
fromDttm, toDttm, asOf);
DataStream jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC 
Source");

jdbcTradesStreamIn.transform("SqlServerSink", 
TypeInformation.of(MyObj.class), new SqlServerBulkCopySink(
new FileCheckpointCommitter("c:\\temp\\FlinkTemp"),
TypeExtractor.createTypeInfo(MyObj.class).createSerializer(new 
ExecutionConfig()),
UUID.randomUUID().toString()));


env.execute();
}

private static class SqlServerBulkCopySink extends GenericWriteAheadSink 
{
public SqlServerBulkCopySink(CheckpointCommitter committer, 
TypeSerializer serializer, String jobID) throws Exception {
super(committer, serializer, jobID);
}

@Override
protected boolean sendValues(Iterable objects, long checkpointId, 
long timestamp) {
logger.info("Sending 
{},{}---", checkpointId, timestamp);
for (MyObj myObj: objects)
logger.info("  {},{}: {}", checkpointId, timestamp, trade); // this 
will eventually be a bulk copy insert into the SQL Server database
return true;
}
}



Am I right in thinking the latest versions of Flink will not suffer from this 
problem or am I hitting something else? To be clear, I am expecting a 
checkpoint to be invoked by Flink to cover all the data I want to insert into 
my DB - how else would I do the final bulk copy if my sendValues() is not 
called?


I have more questions about my data sink but I will wait to hear your answers.


Many thanks in advance,


James.



Re: Flink sink data to DB and then commit data to Kafka

2021-11-03 Thread Ali Bahadir Zeybek
Hello Qihua,

If you do not care with the events that are not committed to DB,
you can use Async I/O [1] and implement a logic that

   - does the database inserts
   - completes the original events that are only accepted by DB

You can then sink this new datastream to kafka.

If you are also interested in the events that are not committed to DB,
you can use a Process Function [2] and implement a logic that

   - does the database inserts
   - collects the original events that are only accepted by DB
   - sends the ones that are not accepted by DB to a side output

You can then sink this new datastream to kafka and maybe sideoutput to
another topic.

Sincerely,

Ali Bahadir Zeybek

[1]:
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio
[2]:
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function

On Wed, Nov 3, 2021 at 3:16 PM Francesco Guardiani 
wrote:

> An alternative is to use a CDC tool like Debezium to stream your table
> changes, and then ingest that stream using Flink to push data later to
> Kafka.
>
> On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma  wrote:
>
>> Hi, Qihua
>>
>> AFAIK there is no way to do it. Maybe you need to implement a "new" sink
>> to archive this target.
>>
>> Best,
>> Guowei
>>
>>
>> On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang  wrote:
>>
>>> Hi,
>>>
>>> Our flink application has two sinks(DB and kafka topic). We want to push
>>> same data to both sinks. Is it possible to push data to kafka topic only
>>> after data is pushed to DB successfully? If the commit to DB fail, we don't
>>> want those data is pushed to kafka.
>>>
>>> Thanks,
>>> Qihua
>>>
>>


Re: 退订

2021-11-03 Thread Leonard Xu
如果需要取消订阅 user-zh@flink.apache.org 邮件组,请发送任意内容的邮件到 
user-zh-unsubscr...@flink.apache.org

> 在 2021年11月2日,14:15,李芳奎  写道:
> 
> 退订
> 
> felix 
> 
> felix_...@163.com



RE: Custom partitioning of keys with keyBy

2021-11-03 Thread Schwalbe Matthias
Hi Yuval,

Just a couple of comments:


  *   Assuming that all your 4 different keys are evenly distributed, and you 
send them to (only) 3 buckets, you would expect at least one bucket to cover 2 
of your keys, hence the 50%
  *   With low entropy keys avoiding data skew is quite difficult
  *   But your situation could be worse, all 4 keys could end up in the same 
bucket, if the hash function in use happens to generate collisions for the 4 
keys, in which case 2 of your 3 buckets would not process any events … this 
could also lead to watermarks not progressing …
  *   There is two proposal on how to improve the situation:
 *   Use the same parallelism and max parallelism for the relevant 
operators and implement a manual partitioner
*   A manual partitioner is also good in situations where you want to 
lower the bias and you exactly know the distribution of your key space and 
rearrange keys to even-out numbers
 *   More sophisticated (if possible), divide-and-conquer like:
*   Key by your ‘small’ key plus soma arbitrary attribute with higher 
entropy
*   Window aggregate first on that artificial key
*   Aggregate the results on your original ‘small’ key
*   This could be interesting for high-throughput situation where you 
actually want to run in parallelism higher than the number of different ‘small’ 
keys

Hope this helps

Thias


From: Yuval Itzchakov 
Sent: Mittwoch, 3. November 2021 14:41
To: user 
Subject: Custom partitioning of keys with keyBy

Hi,
I have a use-case where I'd like to partition a KeyedDataStream a bit 
differently than how Flinks default partitioning works with key groups.

[cid:image001.png@01D7D0C8.69E83060]
What I'd like to be able to do is take all my data and split it up evenly 
between 3 buckets which will store the data in the state. Using the key above 
works, but splits the data unevenly between the different key groups, as 
usually the key space is very small (0 - 3). What ends up happening is that 
sometimes 50% of the keys end up on the same operator index, where ideally I'd 
like to distribute it evenly between all operator indexes in the cluster.

Is there any way of doing this?
--
Best Regards,
Yuval Itzchakov.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Re: Flink sink data to DB and then commit data to Kafka

2021-11-03 Thread Francesco Guardiani
An alternative is to use a CDC tool like Debezium to stream your table
changes, and then ingest that stream using Flink to push data later to
Kafka.

On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma  wrote:

> Hi, Qihua
>
> AFAIK there is no way to do it. Maybe you need to implement a "new" sink
> to archive this target.
>
> Best,
> Guowei
>
>
> On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang  wrote:
>
>> Hi,
>>
>> Our flink application has two sinks(DB and kafka topic). We want to push
>> same data to both sinks. Is it possible to push data to kafka topic only
>> after data is pushed to DB successfully? If the commit to DB fail, we don't
>> want those data is pushed to kafka.
>>
>> Thanks,
>> Qihua
>>
>


Re: [Statefun] Questions on recovery

2021-11-03 Thread Igal Shilman
Hello Hady,
Glad to see that you are testing StateFun!

Regarding that exception, I think that this is not the root cause. The root
cause is as you wrote that the StateFun job failed because it wasn't able
to deliver a message to a remote function in the given time frame.
If you look at the logs you most likely see a
StateFulFunctionInvocationException.

However the Flink Job should recover if checkpointing is enabled or a
restart strategy is set in your flink-conf.yaml.
You can refer the default flink-conf.yaml[1] that we ship in the
official Docker image[2]

One more thing that I'd like to point out is that if the remote functions
are not able to keep up with the stream of request,
you have few strategies to deal with that:

* increase the total call timeout per invocation [3]
* reduce the maxNumberOfBatch requests to a lower value [4]
* reduce the total number of asynchronous in-flight requests: set the
following key in your flink-conf.yaml to something lower than 1024. Try and
experiment with different values.
 statefun.async.max-per-task: 256 # for example 256, 128,...
* enable autoscaling of your remote containers (if your
infrastructure allows)

[1]
https://github.com/apache/flink-statefun/blob/56c2f036a6831af885eb15539bc7962bb730b060/tools/docker/flink-distribution-template/conf/flink-conf.yaml
[2] https://hub.docker.com/r/apache/flink-statefun
[3]
https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/http-endpoint/#asynchronous-http-transport-beta
[4]
https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-smoke-e2e-js/src/test/resources/remote-module/module.yaml#L20

Good luck,
Igal.

On Wed, Nov 3, 2021 at 10:59 AM Hady Januar Willi 
wrote:

> Hi everyone,
>
> When testing Flink statefun, the job eventually throws the following
> exception after failing to reach the endpoint or if the endpoint fails
> after the exponentially increasing delay.
>
> java.util.concurrent.RejectedExecutionException:
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox$MailboxClosedException:
> Mailbox is in state CLOSED, but is required to be in state OPEN for put
> operations.
>
> How do I recover from this state?
>
> Thank you.
>
> Regards,
> Hady
>


Re: Statefun embedded functions - parallel per partition, sequential per key

2021-11-03 Thread Igal Shilman
Glad to hear it worked out for you :-)

Cheers,
Igal

On Tue, Nov 2, 2021 at 1:57 PM Filip Karnicki 
wrote:

> Hi All
>
> Just an update for future reference, it turned out that the machine we
> were using for this test didn't have enough memory for what we were asking
> it to do. It was that simple. The upside is that not even with the world's
> most unstable cluster did we manage to lose a single message.
>
> Just as an aside, we got the best results by switching back to undertow,
> but we ended up using it slightly differently than the current example in
> the docs suggests. We needed to pass the work onto a worker thread because
> we had a blocking call in our funcion
>
> class Handler extends HttpHandler{
> (...)
>
>   def handleRequest(exchange: HttpServerExchange): Unit = {
> if (exchange.isInIoThread) {
>   exchange.dispatch(this)
>   return
> }
> exchange.getRequestReceiver.receiveFullBytes((exchange, bytes) => {
>   flinkHandler
> .handle(Slices.wrap(bytes))
> .whenComplete((response: Slice, exception: Throwable) =>
> onComplete(exchange, response, exception))
> })
>   }
>
>   def onComplete(exchange: HttpServerExchange, slice: Slice, throwable:
> Throwable) = (... as per the example)
>
> }
>
> Many thanks again for your help, Igal
>
> On Wed, 27 Oct 2021 at 13:59, Filip Karnicki 
> wrote:
>
>> Thanks for your reply Igal
>>
>> The reason why I'm using data stream integration is that the messages on
>> kafka are in .json, and I need to convert them to protobufs for embedded
>> functions. If I was using remote functions I wouldn't need to do that.
>>
>> With regards to performance, in order to exclude the possibility that
>> it's the remote service that's causing a slowdown, I replaced the undertow
>> example from the docs with 5 instances of webflux services that hand off
>> the work from an event loop to a worker which then sleeps for 1 second. I
>> then launched an nginx instance to forward the request in a round robin
>> fashion to the 5 webflux instances.
>>
>> When I push 10_000 messages onto the ingress kafka topic, it takes
>> upwards of 100 seconds to process all messages. The flink cluster first
>> works pretty hard for about 30 seconds (at ~100% of cpu utilisation) then
>> everything slows down and eventually I get tens of messages trickling down
>> until, after the flink-side statefun job (not the remote job) crashes and
>> gets restarted, which is when the last few stragglers get sent to the
>> egress after 120+ seconds from the launch of the test.
>>
>> I can try to replicate this outside of my work environment if you'd like
>> to run it yourself, but in the meantime, is there a way to achieve this
>> 'sequencial-per-key' behaviour with the use of embedded functions? Those
>> seem to be rock-solid. Maybe there are some internal classes that would at
>> least provide a template on how to do it? I have a naive implementation
>> ready (the one I described in the previous email) but I'm sure there are
>> some edge cases I haven't thought of.
>>
>> Thanks again,
>> Fil
>>
>>
>>
>> On Wed, 27 Oct 2021 at 09:43, Igal Shilman 
>> wrote:
>>
>>> Hello Fil,
>>>
>>> Indeed what you are describing is exactly what a remote function does.
>>>
>>> I am curious to learn more about the current performance limitations
>>> that you encounter with the remote functions.
>>>
>>> One thing to try in combination with the async transport, is to increase
>>> the total number of in fight async operations, by setting the following
>>> property in flink-conf.yaml:
>>>
>>> statefun.async.max-per-task
>>>
>>> To a much higher value than 1024, try experimenting with 16k,32k,64k and
>>> even higher.
>>>
>>> Let me know if that improves the situation, and we can continue from
>>> there.
>>>
>>> p.s,
>>>
>>> You've mentioned that you are using the data stream integration, where
>>> there any particular reason you chose that? It has some limitations at the
>>> moment with respect to remote functions.
>>>
>>>
>>> Cheers,
>>> Igal
>>>
>>> On Wed 27. Oct 2021 at 08:49, Filip Karnicki 
>>> wrote:
>>>
 Hi

 I have a kafka topic with json messages that I map to protobufs within
 a data stream, and then send those to embedded stateful functions using the
 datastream integration api (DataStream[RoutableMessage]). From there I need
 to make an idempotent long-running blocking IO call.

 I noticed that I was processing messages sequentially per kafka
 partition. Is there a way that I could process them sequentially by key
 only (but in parallel per partition)?

 I created some code that uses the embedded functions'
 registerAsyncOperation capabilities to make my long-running IO calls
 effectively asynchronous, but I had to add all this custom code to enqueue
 and persist any messages for a key that came in while there was an
 in-flight IO call happening for that key. I'm fairly confident that I can
 figure out 

Re: Data Stream countWindow followed by keyBy does not preserve time order

2021-11-03 Thread Yan Shen
Hi,

It will complicate things a lot if we cannot assume input order of any
operator after a keyBy. So far I only have the problem with countWindow
which I seem to be able to avoid by writing my own stateful KeyedProcess.
Are there other operators which might cause the same problem?

The other alternative is not to use batch mode, but the problem is that I
wont know when a batch job finishes if I don't run it in batch mode since a
streaming process will never end.

Thanks.

On Wed, Nov 3, 2021 at 4:38 PM Guowei Ma  wrote:

> Hi, Yan
> I do not think it is a bug. Maybe we could not assume the input's order of
> an operator simply.
> Best,
> Guowei
>
>
> On Wed, Nov 3, 2021 at 3:10 PM Yan Shen  wrote:
>
>> Yes, it does not happen in streaming mode. Is this considered a bug or is
>> it by design?
>>
>> Thanks!
>>
>> On Wed, Nov 3, 2021 at 1:58 PM Guowei Ma  wrote:
>>
>>> Hi
>>>
>>> I did not run your program directly, but I see that you are now using
>>> the Batch execution mode. I suspect it is related to this, because in the
>>> Batch execution mode FLINK will "sort" the Key (this might be an unstable
>>> sort).
>>> So would you like to experiment with the results of running with
>>> Streaming mode and to see what happens?
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Wed, Nov 3, 2021 at 12:16 AM Yan Shen  wrote:
>>>
 Hi all,

 Can anyone advise on this?

 I wrote a simple test of the countWindow method (in Kotlin) as below

 package aero.airlab.flinkjobs.headingreminder

 import org.apache.flink.api.common.RuntimeExecutionMode
 import org.apache.flink.api.common.eventtime.WatermarkStrategy
 import 
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import kotlin.random.Random

 object CountWindowTest {
 @JvmStatic
 fun main(args: Array) {
 val env = StreamExecutionEnvironment.getExecutionEnvironment()
 env.setRuntimeMode(RuntimeExecutionMode.BATCH)

 val rand = Random(0)
 val data = (0..1000).map { Pair(rand.nextInt(10), it) }
 env.fromCollection(data).assignTimestampsAndWatermarks(
 WatermarkStrategy.forMonotonousTimestamps>()
 .withTimestampAssigner { e, _ -> e.second.toLong() })
 .keyBy { it.first }
 .countWindow(3L, 1)
 .reduce { a, b -> b }
 .keyBy { it.first }
 .filter { it.first == 5 }
 .print()

 env.execute()
 }
 }


 The beginning of the output is as such

 12> (5, 184)
 12> (5, 18)
 12> (5, 29)
 12> (5, 37)
 12> (5, 38)
 12> (5, 112)
 12> (5, 131)

 The first line (5, 184) is not in order from the rest.

 Is this a bug? The problem disappears if I remove the keyBy after the
 reduce.

 Thanks.

>>>


Re: Possibility of supporting Reactive mode for native Kubernetes application mode

2021-11-03 Thread Nicolaus Weidner
Hi Fuyao,

I just wanted to say that the performance loss that you rightly suspected
when using savepoints (as opposed to checkpoints) may disappear with Flink
1.15. There should be no loss of functionality as far as checkpoints are
concerned.
I don't think the savepoint performance improvement goals are in the public
Flink Jira yet.

You are right about the default value of 1 checkpoint that is retained, but
you can change it if required:
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#state-checkpoints-num-retained
Now that I think about it, you probably want to keep it at 1 intentionally
(I think that's what you implied) - so you can be sure that the single
checkpoint in the provided checkpoint directory is the latest retained
checkpoint. I haven't attempted something like this before, but it sounds
like it should work.

Best,
Nico

On Tue, Nov 2, 2021 at 10:14 PM Fuyao Li  wrote:

> Hi David, Nicolaus,
>
>
>
> Thanks for the reply.
>
>
>
>1. For your first question, Yes. I want to use the checkpoint to stop
>and restart the application. I think this is similar to the Reactive mode
>strategy, right? (I don’t know the exact implementation behind the Reactive
>mode). From your description and Nicolaus reply, I guess this improvement
>for checkpoint will benefit both Reactive mode and this workflow I designed
>instead of breaking this proposal, right?
>
>
>- *For Nicolaus, after such change in 1.15, do you mean the checkpoint
>can’t be used to restart a job? If this is the case, maybe my proposal will
>not work after 1.15…*
>
> Please share the Jira link to this design if possible and correct my
> statement if I am wrong.
>
>
>
> Nicolaus’s suggestion of leveraging retained checkpoint is exactly what I
> am trying to describe in my 3-step solution.
>
>
>
> Quote from Nicolaus:
>
> “
>
> About your second question: You are right that taking and restoring from
> savepoints will incur a performance loss. They cannot be incremental, and
> cannot use native (low-level) data formats - for now. These issues are on
> the list of things to improve for Flink 1.15, so if the changes make it
> into the release, it may improve a lot.
>
> You can restore a job from a retained checkpoint (provided you configured
> retained checkpoints, else they are deleted on job cancellation), see [1]
> (right below the part you linked). It should be possible to rescale using a
> retained checkpoint, despite the docs suggesting otherwise (it was
> uncertain whether this guarantee should/can be given, so it was not stated
> in the docs. This is also expected to change in the future as it is a
> necessity for further reactive mode development).
>
>
>
> [1] 
> *https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
> *
>
>
>
> ”
>
>
>
>1. For using Standalone Kubernetes problem. I have development a Flink 
> *native
>Kubernetes* operator upon
>https://github.com/wangyang0918/flink-native-k8s-operator
>
> 
>. Right now, this operator can basically achieve everything Flink CLI could
>do for session mode and application mode and more. This includes features
>like rescaling with savepoint (stop with savepoint and start from
>savepoint), stop with savepoint, submit/stop/cancel session jobs etc. All
>of these are automated through a unified Kubernetes CRD. For sake of time,
>I don’t want to write another operator for standalone k8s operator… As a
>result, I am seeking to add the reactive scaling function into this
>operator. Nevertheless, I really appreciate the work for reactive mode in
>standalone Kubernetes.
>
>
>
> Based on Nicolaus’s reply. I think if we configure the retain checkpoint
> policy. By default, I think only one checkpoint will be retained (please
> correct me if I am wrong) and we can capture the directory and rescale the
> application.
>
> See
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#retained-checkpoints
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-externalized-checkpoint-retention
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>
>
>
>
>
>1. I removed the [EXTERNAL] tag in the email. This is something
>automatically added by the company’s email box. Sorry for the confusion.
>
>
>
>
>
> Best Regards,
>
> Fuyao
>
>
>
> *From: *David Morávek 
> *Date: *Tuesday, November 2, 2021 at 05:53
> *To: *Fuyao Li 
> *Cc: *user , Yang Wang ,
> Robert 

Re: New blog post published - Sort-Based Blocking Shuffle Implementation in Flink

2021-11-03 Thread Yuxin Tan
Thanks Daisy and Kevin! The IO scheduling idea of the sequential reading
and the benchmark result look really great!  Looking forward to the next
work.

Best,

Yuxin

weijie guo  于2021年11月3日周三 下午5:24写道:

> It's really an amazing job to fill in the defects of flink in batch
> shuffle. I really appreciate the work done in io scheduling, the sequential
> reading of the shuffle reader can greatly improve the disk IO performance
> and stability. Sort-based shuffle realizes this feature in a concise and
> efficient way. By the way, the default shuffle implementation in the batch
> mode of flink is still hash-based, maybe we can consider using the new
> shuffle implementation by default later. Last but not least, thank Yingjie
> Cao (Kevin) and Daisy Tsang for publishing this blog.
>
> Lijie Wang  于2021年11月3日周三 下午4:17写道:
>
>> Thanks Daisy and Kevin for bringing this blog, it is very helpful for
>> understanding the principle of sort shuffle.
>>
>>
>> Best,
>>
>> Lijie
>>
>> Guowei Ma  于2021年11月3日周三 下午2:57写道:
>>
>>>
>>> Thank Daisy& Kevin much for your introduction to the improvement of TM
>>> blocking shuffle, credit base+io scheduling is indeed a very interesting
>>> thing. At the same time, I look forward to this as a default setting for tm
>>> blocking shuffle.
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Wed, Nov 3, 2021 at 2:46 PM Gen Luo  wrote:
>>>
 Thanks Daisy and Kevin! The benchmark results look really exciting!

 On Tue, Nov 2, 2021 at 4:38 PM David Morávek  wrote:

> Thanks Daisy and Kevin for a great write up! ;) Especially the 2nd
> part was really interesting, I really like the idea of the single spill
> file with a custom scheduling of read requests.
>
> Best,
> D.
>
> On Mon, Nov 1, 2021 at 10:01 AM Daisy Tsang 
> wrote:
>
>> Hey everyone, we have a new two-part post published on the Apache
>> Flink blog about the sort-based blocking shuffle implementation in Flink.
>> It covers benchmark results, design and implementation details, and more!
>> We hope you like it and welcome any sort of feedback on it. :)
>>
>>
>> https://flink.apache.org/2021/10/26/sort-shuffle-part1.html
>> https://flink.apache.org/2021/10/26/sort-shuffle-part2.html
>>
>


[Statefun] Questions on recovery

2021-11-03 Thread Hady Januar Willi
Hi everyone,

When testing Flink statefun, the job eventually throws the following
exception after failing to reach the endpoint or if the endpoint fails
after the exponentially increasing delay.

java.util.concurrent.RejectedExecutionException:
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox$MailboxClosedException:
Mailbox is in state CLOSED, but is required to be in state OPEN for put
operations.

How do I recover from this state?

Thank you.

Regards,
Hady


Re: New blog post published - Sort-Based Blocking Shuffle Implementation in Flink

2021-11-03 Thread weijie guo
It's really an amazing job to fill in the defects of flink in batch
shuffle. I really appreciate the work done in io scheduling, the sequential
reading of the shuffle reader can greatly improve the disk IO performance
and stability. Sort-based shuffle realizes this feature in a concise and
efficient way. By the way, the default shuffle implementation in the batch
mode of flink is still hash-based, maybe we can consider using the new
shuffle implementation by default later. Last but not least, thank Yingjie
Cao (Kevin) and Daisy Tsang for publishing this blog.

Lijie Wang  于2021年11月3日周三 下午4:17写道:

> Thanks Daisy and Kevin for bringing this blog, it is very helpful for
> understanding the principle of sort shuffle.
>
>
> Best,
>
> Lijie
>
> Guowei Ma  于2021年11月3日周三 下午2:57写道:
>
>>
>> Thank Daisy& Kevin much for your introduction to the improvement of TM
>> blocking shuffle, credit base+io scheduling is indeed a very interesting
>> thing. At the same time, I look forward to this as a default setting for tm
>> blocking shuffle.
>>
>> Best,
>> Guowei
>>
>>
>> On Wed, Nov 3, 2021 at 2:46 PM Gen Luo  wrote:
>>
>>> Thanks Daisy and Kevin! The benchmark results look really exciting!
>>>
>>> On Tue, Nov 2, 2021 at 4:38 PM David Morávek  wrote:
>>>
 Thanks Daisy and Kevin for a great write up! ;) Especially the 2nd part
 was really interesting, I really like the idea of the single spill file
 with a custom scheduling of read requests.

 Best,
 D.

 On Mon, Nov 1, 2021 at 10:01 AM Daisy Tsang 
 wrote:

> Hey everyone, we have a new two-part post published on the Apache
> Flink blog about the sort-based blocking shuffle implementation in Flink.
> It covers benchmark results, design and implementation details, and more!
> We hope you like it and welcome any sort of feedback on it. :)
>
>
> https://flink.apache.org/2021/10/26/sort-shuffle-part1.html
> https://flink.apache.org/2021/10/26/sort-shuffle-part2.html
>



Proactively Push Metrics After Tasks Finished

2021-11-03 Thread kanata163
Hi, all:
As discussed in FLIP-147 [1], checkpoints are supported after tasks finished, 
but metrics not. As we known, metricReporters are reported periodically, 
default 10 seconds. 
If sources are bounded, the final metrics may not be reported to metric system 
like pushgateway when task finished. This makes users unable to obtain the 
correct metrics。


I have created a JIRA[2] before, but at that time the indicator system of flink 
was considered to be used to monitor a running application. But now in Flink 
1.14, 
batch and stream processing are unified. I think it's also important to get the 
metrics of bounded or mixed jobs even if those jobs are finished.


Therefore, we propose to also proactively push metrics after tasks finished.


Thanks, hope for your response.


[1]  
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
[2]  https://issues.apache.org/jira/browse/FLINK-22631




 

Re: Reactive mode in 1.13

2021-11-03 Thread Till Rohrmann
Hi Ravi,

I think you can pass the arguments to the job via `./bin/standalone-job.sh
start  -Dscheduler-mode=reactive -Dexecution.checkpointing.interval="3000s"
lib/tornado.jar myArguments`.

Cheers,
Till

On Wed, Nov 3, 2021 at 5:20 AM Ravi Sankar Reddy Sangana 
wrote:

> Thanks a lot working fine now. Also you also explain how to pass
> parameters to the job. In the session cluster I am passing arguments using
> api.
>
>
>
> Here how can I pass the arguments to the job?
>
>
>
>
>
> Regards,
>
> Ravi Sankar Reddy.
>
>
>
> *From:* Till Rohrmann 
> *Sent:* 02 November 2021 07:33 PM
> *To:* Ravi Sankar Reddy Sangana 
> *Cc:* user 
> *Subject:* Re: Reactive mode in 1.13
>
>
>
> Hi Ravi,
>
>
>
> I think you also need to make the tornado.jar available to the
> TaskExecutor processes (e.g. putting them into the usrlib or lib directory
> where you started the process). When using the application mode, then Flink
> assumes that all processes have access to the user code jar. That's why
> Flink won't ship the user code jars to the other processes unlike when
> using the session cluster mode, for example. The idea is that the user code
> is bundled together with the application cluster deployment.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Tue, Nov 2, 2021 at 1:01 PM Ravi Sankar Reddy Sangana <
> ra...@radware.com> wrote:
>
> Set up:
>
>
>
> 1 job manager in 2 core 6 GB
>
> 2 task managers in 4 core 12 GB
>
>
>
> Created fat jar and copied the jar to jobmanager lib folder.
>
>
>
> ./bin/standalone-job.sh start  -Dscheduler-mode=reactive
> -Dexecution.checkpointing.interval="3000s" lib/tornado.jar
>
>
>
> *Build logs in job manager:*
>
>
>
> [INFO] --- maven-shade-plugin:3.0.0:shade (default) @ clive ---
>
> [INFO] Including org.apache.flink:flink-java:jar:1.13.1 in the shaded jar.
>
> [INFO] Including org.apache.flink:flink-core:jar:1.13.1 in the shaded jar.
>
> [INFO] Including org.apache.flink:flink-annotations:jar:1.13.1 in the
> shaded jar.
>
> [INFO] Including org.apache.flink:flink-metrics-core:jar:1.13.1 in the
> shaded jar.
>
> [INFO] Including org.apache.flink:flink-shaded-asm-7:jar:7.1-13.0 in the
> shaded jar.
>
> [INFO] Including com.esotericsoftware.kryo:kryo:jar:2.24.0 in the shaded
> jar.
>
> [INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded
> jar.
>
> [INFO] Including org.objenesis:objenesis:jar:2.1 in the shaded jar.
>
> [INFO] Including commons-collections:commons-collections:jar:3.2.2 in the
> shaded jar.
>
> [INFO] Including org.apache.commons:commons-compress:jar:1.20 in the
> shaded jar.
>
> [INFO] Including org.apache.flink:flink-shaded-guava:jar:18.0-13.0 in the
> shaded jar.
>
> [INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded
> jar.
>
> [INFO] Including org.apache.commons:commons-math3:jar:3.5 in the shaded
> jar.
>
> [INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
>
> [INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded
> jar.
>
> [INFO] Excluding org.apache.flink:force-shading:jar:1.13.1 from the shaded
> jar.
>
> [INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.13.1 in
> the shaded jar.
>
> [INFO] Including org.apache.kafka:kafka-clients:jar:2.4.1 in the shaded
> jar.
>
> [INFO] Including com.github.luben:zstd-jni:jar:1.4.3-1 in the shaded jar.
>
> [INFO] Including org.lz4:lz4-java:jar:1.6.0 in the shaded jar.
>
> [INFO] Including org.xerial.snappy:snappy-java:jar:1.1.7.3 in the shaded
> jar.
>
> [INFO] Including org.apache.flink:flink-connector-base:jar:1.13.1 in the
> shaded jar.
>
> [INFO] Replacing original artifact with shaded artifact.
>
>
>
> *LOGS:*
>
>
>
> 2021-11-02 11:02:36,224 INFO
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] -
> Restarting job.
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
> user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
>
> ClassLoader info: URL ClassLoader:
>
> Class not resolvable through given classloader.
>
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:154)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at 

Re: Data Stream countWindow followed by keyBy does not preserve time order

2021-11-03 Thread Guowei Ma
Hi, Yan
I do not think it is a bug. Maybe we could not assume the input's order of
an operator simply.
Best,
Guowei


On Wed, Nov 3, 2021 at 3:10 PM Yan Shen  wrote:

> Yes, it does not happen in streaming mode. Is this considered a bug or is
> it by design?
>
> Thanks!
>
> On Wed, Nov 3, 2021 at 1:58 PM Guowei Ma  wrote:
>
>> Hi
>>
>> I did not run your program directly, but I see that you are now using the
>> Batch execution mode. I suspect it is related to this, because in the Batch
>> execution mode FLINK will "sort" the Key (this might be an unstable sort).
>> So would you like to experiment with the results of running with
>> Streaming mode and to see what happens?
>>
>> Best,
>> Guowei
>>
>>
>> On Wed, Nov 3, 2021 at 12:16 AM Yan Shen  wrote:
>>
>>> Hi all,
>>>
>>> Can anyone advise on this?
>>>
>>> I wrote a simple test of the countWindow method (in Kotlin) as below
>>>
>>> package aero.airlab.flinkjobs.headingreminder
>>>
>>> import org.apache.flink.api.common.RuntimeExecutionMode
>>> import org.apache.flink.api.common.eventtime.WatermarkStrategy
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>> import kotlin.random.Random
>>>
>>> object CountWindowTest {
>>> @JvmStatic
>>> fun main(args: Array) {
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment()
>>> env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>>>
>>> val rand = Random(0)
>>> val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>>> env.fromCollection(data).assignTimestampsAndWatermarks(
>>> WatermarkStrategy.forMonotonousTimestamps>()
>>> .withTimestampAssigner { e, _ -> e.second.toLong() })
>>> .keyBy { it.first }
>>> .countWindow(3L, 1)
>>> .reduce { a, b -> b }
>>> .keyBy { it.first }
>>> .filter { it.first == 5 }
>>> .print()
>>>
>>> env.execute()
>>> }
>>> }
>>>
>>>
>>> The beginning of the output is as such
>>>
>>> 12> (5, 184)
>>> 12> (5, 18)
>>> 12> (5, 29)
>>> 12> (5, 37)
>>> 12> (5, 38)
>>> 12> (5, 112)
>>> 12> (5, 131)
>>>
>>> The first line (5, 184) is not in order from the rest.
>>>
>>> Is this a bug? The problem disappears if I remove the keyBy after the
>>> reduce.
>>>
>>> Thanks.
>>>
>>


Re: Question on BoundedOutOfOrderness

2021-11-03 Thread Guowei Ma
Hi Oliver

I think Alexey is right that you could not assume that the record would be
output in the event time order.
And there is a small addition.I see your output and there are actually
multiple concurrencies (probably 11 subtasks). You also can't expect these
concurrencies to be ordered according to event time.

Best,
Guowei


On Wed, Nov 3, 2021 at 6:46 AM Alexey Trenikhun  wrote:

> Hi Oliver,
> I believe you also need to do sort, out of order ness watermark strategy
> only “postpone” watermark for given expected maximum of out of orderness.
> Check Ververica example -
> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/examples/table_java/stream/Sort.java
>
> Alexey
> --
> *From:* Oliver Moser 
> *Sent:* Tuesday, November 2, 2021 12:52:59 PM
> *To:* user@flink.apache.org 
> *Subject:* Question on BoundedOutOfOrderness
>
> Hi!
>
> I am investigating the use of Flink for a new project and started some
> simple demos.
>
> Currently I am stuck at the point where I need to deal with events
> arriving out of order based on their event time. I’ve spent quite some time
> researching on SO, the docs, the Ververica training (excellent resource
> btw), however, I assume I still run into some conceptual misconceptions :-)
>
> I put together the following demo code, and I would expect that the
> console output would list the events chronologically based on their
> embedded event time. However, events are always printed in the same order
> as they are pushed into the data stream by the OutOfOrderEventSource.
>
> Sample console output:
>
> —
> 3> EventRecord{id=3, counter=1, eventId=0,
> timestamp=2021-11-02T20:10:01.554}
> 4> EventRecord{id=2, counter=2, eventId=1,
> timestamp=2021-11-02T20:10:02.810}
> 5> EventRecord{id=0, counter=3, eventId=0,
> timestamp=2021-11-02T20:09:59.815}
> 6> EventRecord{id=4, counter=4, eventId=1,
> timestamp=2021-11-02T20:10:00.815}
> 7> EventRecord{id=1, counter=5, eventId=1,
> timestamp=2021-11-02T20:10:05.819}
> 8> EventRecord{id=4, counter=6, eventId=0,
> timestamp=2021-11-02T20:10:04.819}
> 9> EventRecord{id=0, counter=7, eventId=1,
> timestamp=2021-11-02T20:10:03.824}
> 10> EventRecord{id=0, counter=8, eventId=1,
> timestamp=2021-11-02T20:10:05.828}
> 11> EventRecord{id=3, counter=9, eventId=1,
> timestamp=2021-11-02T20:10:09.829}
> —
>
> My expectation would be to receive the events ordered:
>
> —
> 5> EventRecord{id=0, counter=3, eventId=0,
> timestamp=2021-11-02T20:09:59.815}
> 6> EventRecord{id=4, counter=4, eventId=1,
> timestamp=2021-11-02T20:10:00.815}
> 3> EventRecord{id=3, counter=1, eventId=0,
> timestamp=2021-11-02T20:10:01.554}
> 4> EventRecord{id=2, counter=2, eventId=1,
> timestamp=2021-11-02T20:10:02.810}
> 9> EventRecord{id=0, counter=7, eventId=1,
> timestamp=2021-11-02T20:10:03.824}
> …
> —
>
>
> Given a BoundedOutOfOrderness watermarking strategy with a 20 seconds
> duration, my expectation would have been that for the first event that is
> pushed to the demo source
>
> EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554}
>
> this would set the initial watermark to "2021-11-02T20:09:41.554”, hence
> events that are older than this timestamp are not considered, but events
> younger than this timestamps are considered and ordering of events happens
> accordingly. That would bean that
>
> EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815}
>
> would still be considered on time.
>
> I’m sure I am missing something conceptually.
>
> Here is the code that I’m using:
>
> ---
>
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>
> import java.time.Duration;
> import java.time.LocalDateTime;
> import java.time.ZoneOffset;
> import java.time.temporal.ChronoUnit;
> import java.util.Random;
> import java.util.concurrent.atomic.AtomicInteger;
>
> import static java.time.Instant.ofEpochMilli;
> import static java.time.LocalDateTime.ofInstant;
>
> public class SimpleOutOfOrderDemo {
>
>public static void main(String... args) throws Exception {
>   var env = StreamExecutionEnvironment.getExecutionEnvironment();
>   var watermarkStrategy =
>  WatermarkStrategy.forGenerator(
>
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)))
> .withTimestampAssigner((e, rt) -> e.timestamp);
>
>   var source = new OutOfOrderEventSource();
>
> env.addSource(source).assignTimestampsAndWatermarks(watermarkStrategy).print();
>   env.execute("Simple Out of Order Demo");
>}
>
>public static class OutOfOrderEventSource implements
> SourceFunction {
>
>   static final int MAX_ELEMENTS = 10;
>
>   static final long INTERVAL = 1000;
>
>   AtomicInteger counter = new AtomicInteger();
>
>   @Override
>   public void 

Re: New blog post published - Sort-Based Blocking Shuffle Implementation in Flink

2021-11-03 Thread Lijie Wang
Thanks Daisy and Kevin for bringing this blog, it is very helpful for
understanding the principle of sort shuffle.


Best,

Lijie

Guowei Ma  于2021年11月3日周三 下午2:57写道:

>
> Thank Daisy& Kevin much for your introduction to the improvement of TM
> blocking shuffle, credit base+io scheduling is indeed a very interesting
> thing. At the same time, I look forward to this as a default setting for tm
> blocking shuffle.
>
> Best,
> Guowei
>
>
> On Wed, Nov 3, 2021 at 2:46 PM Gen Luo  wrote:
>
>> Thanks Daisy and Kevin! The benchmark results look really exciting!
>>
>> On Tue, Nov 2, 2021 at 4:38 PM David Morávek  wrote:
>>
>>> Thanks Daisy and Kevin for a great write up! ;) Especially the 2nd part
>>> was really interesting, I really like the idea of the single spill file
>>> with a custom scheduling of read requests.
>>>
>>> Best,
>>> D.
>>>
>>> On Mon, Nov 1, 2021 at 10:01 AM Daisy Tsang  wrote:
>>>
 Hey everyone, we have a new two-part post published on the Apache Flink
 blog about the sort-based blocking shuffle implementation in Flink.  It
 covers benchmark results, design and implementation details, and more!  We
 hope you like it and welcome any sort of feedback on it. :)


 https://flink.apache.org/2021/10/26/sort-shuffle-part1.html
 https://flink.apache.org/2021/10/26/sort-shuffle-part2.html

>>>


Re: Why do the count windows in Flink Table APIs require processing time for sorting whereas in Flink Datastream APIs they do not

2021-11-03 Thread Guowei Ma
Hi Long

>From the API point of view, this processing time can be omitted. This is
mainly for unification: event-time scenarios, and alignment
with other window APIs.

Thanks Jark Wu  for telling me this offline.

Best,
Guowei


On Wed, Nov 3, 2021 at 11:55 AM Long Nguyễn 
wrote:

> I have read about the Window operator
> 
> in Flink documentation and know that it groups rows into finite groups
> based on time or row-count intervals.
>
> I saw an example of a sliding count window right there
> 
> :
>
> // Sliding Row-count window (assuming a processing-time attribute 
> "proctime").window(Slide.over(rowInterval(10)).every(rowInterval(5)).on($("proctime")).as("w"));
>
> As mentioned in the docs, the on method here is to define:
>
>> The time attribute to group (time interval) or sort (row count) on. For
>> batch queries this might be any Long or Timestamp attribute. For streaming
>> queries this must be a declared event-time or processing-time time
>> attribute.
>
>
> On the other hand, I searched found this countWindow
> 
>  method
> in Flink's Java docs and saw that it does not specify any time-related
> parameter.
> I'm wondering why a sliding count window in Flink Table APIs requires
> processing time whereas it is unnecessary in the Datastream APIs.
>
> I really appreciate it if someone can clarify this for me.
>
> --
> 
> --
> Nguyen Dich Long,
> School of Information and Communication Technology (SoICT -
> https://www.soict.hust.edu.vn)
> Hanoi University of Science and Technology (https://www.hust.edu.vn)
> 601, B1 Building - No 1, Dai Co Viet Street, Hai Ba Trung District, Ha
> Noi, Vietnam
> Tel: +84 (0)3.54.41.76.76
> Email: long.nd162...@sis.hust.edu.vn; longnguyen25111...@gmail.com
>


Re: Is there a way to update checkpoint configuration for a job "in-place"?

2021-11-03 Thread Guowei Ma
Hi Kevin
If you want to change this configuration(execution.checkpointing.timeout)
without restarting the job, as far as I know, there may not be such a
method.
But could you consider increasing this value by default?

Best,
Guowei


On Wed, Nov 3, 2021 at 5:15 AM Kevin Lam  wrote:

> Hi all,
>
> We run a Flink application on Kubernetes in Application Mode using Kafka
> with exactly-once-semantics and high availability.
>
> We are looking into a specific failure scenario: a flink job that has too
> short a checkpoint timeout (execution.checkpointing.timeout) and at some
> point during the job's execution, checkpoints begin to fail.
>
> Is there a way to update the checkpoint timeout
> (execution.checkpointing.timeout) of this job, in-place ie. without
> creating a new job, or restoring from an old savepoint/checkpoint? Note:
> one idea may be to take a savepoint, and then restore from that savepoint
> with the new configuration, however this is not possible because if
> checkpoints are timing out, so are savepoints and thus save points cannot
> be taken. Are there any other ways to handle this situation?
>
> We want to ensure exactly-once semantics are respected.
>
> Thanks in advance!
>


Re: How to refresh topics to ingest with KafkaSource?

2021-11-03 Thread Martijn Visser
Hi Mason,

I've assigned it to you.

Best regards,

Martijn

On Tue, 2 Nov 2021 at 23:28, Mason Chen  wrote:

> Hi Arvid,
>
> I have some bandwidth to contribute to this task and am familiar with the
> code. Could you or another committer assign me this ticket?
>
> Thanks,
> Mason
>
> On Oct 30, 2021, at 5:24 AM, Arvid Heise  wrote:
>
> Hi Mason,
>
> thanks for creating that.
>
> We are happy to take contribuitons (I flagged it as a starter task).
>
> On Wed, Oct 27, 2021 at 2:36 AM Mason Chen  wrote:
>
>> Hi all,
>>
>> I have a similar requirement to Preston. I created
>> https://issues.apache.org/jira/browse/FLINK-24660 to track this effort.
>>
>> Best,
>> Mason
>>
>> On Oct 18, 2021, at 1:59 AM, Arvid Heise  wrote:
>>
>> Hi Preston,
>>
>> if you still need to set KafkaSubscriber explicitly, could you please
>> create a feature request for that? For now, you probably have to resort to
>> reflection hacks and build against a the non-public KafkaSubscriber.
>>
>> On Fri, Oct 15, 2021 at 4:03 AM Prasanna kumar <
>> prasannakumarram...@gmail.com> wrote:
>>
>>> Yes you are right.
>>>
>>> We tested recently to find that the flink jobs do not pick up the new
>>> topics that got created with the same pattern provided to flink kafka
>>> consumer.  The topics are set only during the start of the jobs.
>>>
>>> Prasanna.
>>>
>>> On Fri, 15 Oct 2021, 05:44 Preston Price,  wrote:
>>>
 Okay so topic discovery is possible with topic patterns, and maybe
 topic lists. However I don't believe it's possible to change the configured
 topic list, or topic pattern after the source is created.

 On Thu, Oct 14, 2021, 3:52 PM Denis Nutiu 
 wrote:

> There is a setting for dynamic topic discovery
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#topic-and-partition-discovery
>
> Best,
>
> Denis
>
> On Fri, Oct 15, 2021 at 12:48 AM Denis Nutiu 
> wrote:
>
>> Hi,
>>
>> In my experience with the librdkafka client and the Go wrapper, the
>> topic-pattern subscribe is reactive. The Flink Kafka connector might 
>> behave
>> similarly.
>>
>> Best,
>> Denis
>>
>> On Fri, Oct 15, 2021 at 12:34 AM Preston Price 
>> wrote:
>>
>>> No, the topic-pattern won't work for my case. Topics that I should
>>> subscribe to can be enabled/disabled based on settings I read from 
>>> another
>>> system, so there's no way to craft a single regular expression that 
>>> would
>>> fit the state of all potential topics. Additionally the documentation 
>>> you
>>> linked seems to suggest that the regular expression is evaluated only 
>>> once
>>> "when the job starts running". My understanding is it would not pick up 
>>> new
>>> topics that match the pattern after the job starts.
>>>
>>>
>>> On Wed, Oct 13, 2021 at 8:51 PM Caizhi Weng 
>>> wrote:
>>>
 Hi!

 I suppose you want to read from different topics every now
 and then? Does the topic-pattern option [1] in Table API Kafka 
 connector
 meet your needs?

 [1]
 https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/kafka/#topic-pattern

 Preston Price  于2021年10月14日周四 上午1:34写道:

> The KafkaSource, and KafkaSourceBuilder appear to prevent users
> from providing their own KafkaSubscriber. Am I overlooking something?
>
> In my case I have an external system that controls which topics we
> should be ingesting, and it can change over time. I need to add, and 
> remove
> topics as we refresh configuration from this external system without 
> having
> to stop and start our Flink job. Initially it appeared I could 
> accomplish
> this by providing my own implementation of the `KafkaSubscriber` 
> interface,
> which would be invoked periodically as configured by the `
> partition.discovery.interval.ms` property. However there is no
> way to provide my implementation to the KafkaSource since the 
> constructor
> for KafkaSource is package protected, and the KafkaSourceBuilder does 
> not
> supply a way to provide the `KafkaSubscriber`.
>
> How can I accomplish a period refresh of the topics to ingest?
>
> Thanks
>
>
>
>>
>> --
>> Regards,
>> Denis Nutiu
>>
>
>
> --
> Regards,
> Denis Nutiu
>

>>
>


Re: 关于窗口计算,数据不连续,导致窗口延迟触发的问题。

2021-11-03 Thread Gen Luo
WatermarkGenerator接口有onEvent和onPeriodicEmit, onPeriodicEmit
会周期性调用,可能可以在这里实现一个多长时间没有调onEvent就发一个计算出来的新的watermark的逻辑,新的watermark比当前的watermark对应的窗口时间都更晚应该就能触发所有窗口了

On Mon, Nov 1, 2021 at 5:20 PM yuankuo.xia 
wrote:

> hi
>
>
> 背景:我在使用eventTime窗口进行聚合计算,但是数据不连续,比如:A,B时间段之内都有数据流入,但A时间段和B时间段中间有30分钟无数据流入
>
>
> 问题:由于数据不连续,导致A时间段的最后一个窗口不会触发,一直等到新数据流入才能触发。
>
>
> 是否有方案解决以上问题,比如:一段时间无数据流入,则触发所有窗口。我看了trigger接口,但是没有想到好的实现方案。


回复:关于窗口计算,数据不连续,导致窗口延迟触发的问题。

2021-11-03 Thread 罗根(夕肆)
WatermarkGenerator接口有onEvent和onPeriodicEmit, 
onPeriodicEmit会周期性调用,可能可以在这里实现一个多长时间没有调onEvent就发一个计算出来的新的watermark的逻辑,新的watermark比当前的watermark对应的窗口时间都更晚应该就能触发所有窗口了


--
发件人:yuankuo.xia 
发送时间:2021年11月1日(星期一) 17:20
收件人:user-zh 
主 题:关于窗口计算,数据不连续,导致窗口延迟触发的问题。

hi


背景:我在使用eventTime窗口进行聚合计算,但是数据不连续,比如:A,B时间段之内都有数据流入,但A时间段和B时间段中间有30分钟无数据流入


问题:由于数据不连续,导致A时间段的最后一个窗口不会触发,一直等到新数据流入才能触发。


是否有方案解决以上问题,比如:一段时间无数据流入,则触发所有窗口。我看了trigger接口,但是没有想到好的实现方案。

Re: Data Stream countWindow followed by keyBy does not preserve time order

2021-11-03 Thread Yan Shen
Yes, it does not happen in streaming mode. Is this considered a bug or is
it by design?

Thanks!

On Wed, Nov 3, 2021 at 1:58 PM Guowei Ma  wrote:

> Hi
>
> I did not run your program directly, but I see that you are now using the
> Batch execution mode. I suspect it is related to this, because in the Batch
> execution mode FLINK will "sort" the Key (this might be an unstable sort).
> So would you like to experiment with the results of running with Streaming
> mode and to see what happens?
>
> Best,
> Guowei
>
>
> On Wed, Nov 3, 2021 at 12:16 AM Yan Shen  wrote:
>
>> Hi all,
>>
>> Can anyone advise on this?
>>
>> I wrote a simple test of the countWindow method (in Kotlin) as below
>>
>> package aero.airlab.flinkjobs.headingreminder
>>
>> import org.apache.flink.api.common.RuntimeExecutionMode
>> import org.apache.flink.api.common.eventtime.WatermarkStrategy
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>> import kotlin.random.Random
>>
>> object CountWindowTest {
>> @JvmStatic
>> fun main(args: Array) {
>> val env = StreamExecutionEnvironment.getExecutionEnvironment()
>> env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>>
>> val rand = Random(0)
>> val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>> env.fromCollection(data).assignTimestampsAndWatermarks(
>> WatermarkStrategy.forMonotonousTimestamps>()
>> .withTimestampAssigner { e, _ -> e.second.toLong() })
>> .keyBy { it.first }
>> .countWindow(3L, 1)
>> .reduce { a, b -> b }
>> .keyBy { it.first }
>> .filter { it.first == 5 }
>> .print()
>>
>> env.execute()
>> }
>> }
>>
>>
>> The beginning of the output is as such
>>
>> 12> (5, 184)
>> 12> (5, 18)
>> 12> (5, 29)
>> 12> (5, 37)
>> 12> (5, 38)
>> 12> (5, 112)
>> 12> (5, 131)
>>
>> The first line (5, 184) is not in order from the rest.
>>
>> Is this a bug? The problem disappears if I remove the keyBy after the
>> reduce.
>>
>> Thanks.
>>
>


Re: New blog post published - Sort-Based Blocking Shuffle Implementation in Flink

2021-11-03 Thread Guowei Ma
Thank Daisy& Kevin much for your introduction to the improvement of TM
blocking shuffle, credit base+io scheduling is indeed a very interesting
thing. At the same time, I look forward to this as a default setting for tm
blocking shuffle.

Best,
Guowei


On Wed, Nov 3, 2021 at 2:46 PM Gen Luo  wrote:

> Thanks Daisy and Kevin! The benchmark results look really exciting!
>
> On Tue, Nov 2, 2021 at 4:38 PM David Morávek  wrote:
>
>> Thanks Daisy and Kevin for a great write up! ;) Especially the 2nd part
>> was really interesting, I really like the idea of the single spill file
>> with a custom scheduling of read requests.
>>
>> Best,
>> D.
>>
>> On Mon, Nov 1, 2021 at 10:01 AM Daisy Tsang  wrote:
>>
>>> Hey everyone, we have a new two-part post published on the Apache Flink
>>> blog about the sort-based blocking shuffle implementation in Flink.  It
>>> covers benchmark results, design and implementation details, and more!  We
>>> hope you like it and welcome any sort of feedback on it. :)
>>>
>>>
>>> https://flink.apache.org/2021/10/26/sort-shuffle-part1.html
>>> https://flink.apache.org/2021/10/26/sort-shuffle-part2.html
>>>
>>


Re: What is Could not retrieve file from transient blob store?

2021-11-03 Thread Guowei Ma
Hi, Smith

It seems that the log file(blob_t-274d3c2d5acd78ced877d898b1877b10b62a64df-
590b54325d599a6782a77413691e0a7b) is deleted for some reason. But AFAIK
there are no other guys reporting this exception.(Maybe other guys know
what would happen).
1. I think if you could refresh the page and you would see the correct
result because this would trigger another file retrieving from TM.
2. And It might be more safe that setting an dedicated blob
directory path(other than /tmp) `blob.storage.directory`[1]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#blob-storage-directory


Best,
Guowei


On Wed, Nov 3, 2021 at 7:50 AM John Smith  wrote:

> Hi running Flink 1.10.0 With 3 zookeepers, 3 job nodes and 3 task nodes.
> and I saw this exception on the job node logs...
> 2021-11-02 23:20:22,703 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.
> TaskManagerLogFileHandler - Unhandled exception.
> org.apache.flink.util.FlinkException: Could not retrieve file from
> transient blob store.
> at org.apache.flink.runtime.rest.handler.taskmanager.
> AbstractTaskManagerFileHandler.lambda$respondToRequest$0(
> AbstractTaskManagerFileHandler.java:135)
> at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
> .java:670)
> at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
> CompletableFuture.java:646)
> at java.util.concurrent.CompletableFuture$Completion.run(
> CompletableFuture.java:456)
> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
> at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
> .run(NioEventLoop.java:515)
> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
> at org.apache.flink.shaded.netty4.io.netty.util.internal.
> ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: Local file /tmp/blobStore-9
> cb73f27-11db-4c42-a3fc-9b77f558e722/no_job/blob_t-274d3
> c2d5acd78ced877d898b1877b10b62a64df-590b54325d599a6782a77413691e0a7b does
> not exist and failed to copy from blob store.
> at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer
> .java:516)
> at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer
> .java:444)
> at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:
> 369)
> at org.apache.flink.runtime.rest.handler.taskmanager.
> AbstractTaskManagerFileHandler.lambda$respondToRequest$0(
> AbstractTaskManagerFileHandler.java:133)
> ... 9 more
>


Re: New blog post published - Sort-Based Blocking Shuffle Implementation in Flink

2021-11-03 Thread Gen Luo
Thanks Daisy and Kevin! The benchmark results look really exciting!

On Tue, Nov 2, 2021 at 4:38 PM David Morávek  wrote:

> Thanks Daisy and Kevin for a great write up! ;) Especially the 2nd part
> was really interesting, I really like the idea of the single spill file
> with a custom scheduling of read requests.
>
> Best,
> D.
>
> On Mon, Nov 1, 2021 at 10:01 AM Daisy Tsang  wrote:
>
>> Hey everyone, we have a new two-part post published on the Apache Flink
>> blog about the sort-based blocking shuffle implementation in Flink.  It
>> covers benchmark results, design and implementation details, and more!  We
>> hope you like it and welcome any sort of feedback on it. :)
>>
>>
>> https://flink.apache.org/2021/10/26/sort-shuffle-part1.html
>> https://flink.apache.org/2021/10/26/sort-shuffle-part2.html
>>
>


Re: FlinkSQL 使用 streamingSink 写入 hive orc数据,如何控制文件数量。

2021-11-03 Thread yidan zhao
新问题忽略,估计是因为数据小,我换了个数据量大的,看到inprogress的情况了,是基于 . 开头控制可见性的。

yidan zhao  于2021年11月3日周三 下午1:14写道:

> 还有个问题,我看FlinkSQL写的文件的命名不像文档中说的如下格式:
>
> └── 2019-08-25--12
> ├── prefix-0-0.ext
> ├── prefix-0-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
> ├── prefix-1-0.ext
> └── prefix-1-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
>
> 实际我看到的是,part-8201db8e-36d4-49d2-986d-611f6766b22f-0-351,不存在inprogress的状态,貌似是检查点的时候才一次性写?
>
>
> Caizhi Weng  于2021年11月3日周三 上午10:20写道:
>
>> Hi!
>>
>> hive sink 有文件合并功能可以在同一个 checkpoint 内把同一个 partition 的数据整理到同一个文件里。详见 [1]
>>
>> [1]
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#file-compaction
>>
>> yidan zhao  于2021年11月3日周三 上午10:03写道:
>>
>> > 需求
>> > 假设,我的hive表为tmp表,若干字段,如何以dt、hour、sid为分区,其中sid为渠道的含义。
>> >
>> >
>> 我当前基于FlinkSQL从kafka表中读取数据,转写到hive表tmp中,采用流式写入,提交策略metastore、success-file,触发假设用process-time,delay为1h。
>> > 检查点每1min检查一次,连续2次检查点间隔10min,本质就是10min做一次检查点。
>> >
>> > 当前情况
>> > 由于数据量较大,kafka分区数量为60,因此我的任务并发可以选择60以内,假设并发也选了60。
>> > 那么对于每个时间点,dt肯定只有1个,hour也基本只有1个,sid的话假设有10个。
>> > 文件数情况为:
>> > 每10分钟,10(sid)*60(parallelism)= 600个。
>> > 每小时有6个10分钟(即6次检查点),那么就是6000个文件。
>> > 如上,每小时差不多6000个文件生成,只会多不会少,因为考虑到roll policy等。
>> >
>> >
>> >
>> 目前我需要的是,由于不同sid的数据量不一样,我想能否对于小数据量的sid,只被1个subtask消费,这样对于这个sid对应的分区下,每10分钟的文件数量就是1个,而不是60个。
>> > 对于数据量大的sid,则多个并行subtask消费。
>> > 大概想法类似于datastream api中先keyBy
>> >
>> >
>> sid(当然这里可能有数据倾斜,我可以自己去想法解决,比如将大流量sid分散为sid+randomInt),然后基于streamingSink来消费并写入hive。
>> >
>> > 请问如上想法datastream、以及 flinkSQL 是否都能实现呢?
>> >
>> > 目前我看insert into tmp select ... from
>> > kafka_tmp;这样的话,默认生成2个task,一个kafkaSouce+streamSink(chain在一起)+ partition
>> > commiter,这是不满足需要的肯定。
>> >
>>
>