Re: How to mount PVC volumes using Flink Native Kubernetes ?

2021-09-17 Thread Yang Wang
Thanks Tao for providing your internal use case.

I have create a ticket for this feature[1].

[1]. https://issues.apache.org/jira/browse/FLINK-24332

Best,
Yang

tao xiao  于2021年9月11日周六 上午10:18写道:

> Thanks David for the tips. We have been running Flink with no performance
> degradation observed in EMR (which is EBS attached) for more than 1 year
> therefore we believe the same performance can be applied in Kubernetes.
>
> On Sat, Sep 11, 2021 at 3:13 AM David Morávek  wrote:
>
>> OT: Beware that even if you manage to solve this, EBS is replicated
>> network storage, therefore rocksdb performance will be affected
>> significantly.
>>
>> Best,
>> D.
>>
>> On Fri 10. 9. 2021 at 16:19, tao xiao  wrote:
>>
>>> The use case we have is to store the RocksDB sst files in EBS. The EC2
>>> instance type (m5) we use doesn't provide local disk storage therefore EBS
>>> is the only option to store the local sst file.
>>>
>>> On Fri, Sep 10, 2021 at 7:10 PM Yang Wang  wrote:
>>>
 I am afraid Flink could not support creating dedicated PVC for each
 TaskManager pod now.
 But I think it might be a reasonable requirement.

 Could you please share why you need to mount a persistent volume claim
 per TaskManager?
 AFAIK, the TaskManager will be deleted once it fails. You expect the
 PVC to also be deleted. Right?


 Best,
 Yang

 Xiaolong Wang  于2021年9月10日周五 下午2:37写道:

> Hi,
>
>I'm facing a tough question. I want to start a Flink Native
> Kubernetes job with each of the task manager pod mounted with an aws-ebs
> PVC.
>
>   The first thought is to use the pod-template file to do this, but it
> soon went to a dead end. Since the pod-template on each of the task 
> manager
> pod is the same, how can I mount different PVCs ?
>
>This issue is quite puzzling, will you please help me ?
>
> Thanks in advance !
>

>>>
>>> --
>>> Regards,
>>> Tao
>>>
>>
>
> --
> Regards,
> Tao
>


Re: RocksDB state not cleaned up

2021-09-17 Thread tao xiao
Thanks for the feedback! However TTL already proves that the state cannot
be cleaned up on time due to too many levels built up in RocksDB.

Hi @Yun Tang  do you have any suggestions to tune RocksDB
to accelerate the compaction progress?

On Fri, Sep 17, 2021 at 8:01 PM David Morávek  wrote:

> Cleaning up with timers should solve this. Both approaches have some
> advantages and disadvantages though.
>
> Timers:
> - No "side effects".
> - Can be set in event time. Deletes are regular tombstones that will get
> compacted later on.
>
> TTL:
> - Performance. This costs literally nothing compared to an extra state for
> timer + writing a tombstone marker.
> - Has "side-effects", because it works in processing time. This is just
> something to keep in mind eg. when bootstraping the state from historical
> data. (large event time / processing time skew)
>
> With 1.14 release, we've bumped the RocksDB version so it may be possible
> to use a "periodic compaction" [1], but nobody has tried that so far. In
> the meantime I think there is non real workaround because we don't expose a
> way to trigger manual compaction.
>
> I'm off to vacation until 27th and I won't be responsive during that time.
> I'd like to pull Yun into the conversation as he's super familiar with the
> RocksDB state backend.
>
> [1]
> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction
>
> Best,
> D.
>
> On Fri, Sep 17, 2021 at 5:17 AM tao xiao  wrote:
>
>> Hi David,
>>
>> Confirmed with RocksDB log Stephan's observation is the root cause that
>> compaction doesn't clean up the high level sst files fast enough.  Do you
>> think manual clean up by registering a timer is the way to go or any
>> RocksDB parameter can be tuned to mitigate this issue?
>>
>> On Wed, Sep 15, 2021 at 12:10 AM tao xiao  wrote:
>>
>>> Hi David,
>>>
>>> If I read Stephan's comment correctly TTL doesn't work well for cases
>>> where we have too many levels, like fast growing state,  as compaction
>>> doesn't clean up high level SST files in time, Is this correct? If yes
>>> should we register a timer with TTL time and manual clean up the state
>>> (state.clear() ) when the timer fires?
>>>
>>> I will turn on RocksDB logging as well as compaction logging [1] to
>>> verify this
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction
>>>
>>>
>>> On Tue, Sep 14, 2021 at 5:38 PM David Morávek  wrote:
>>>
 Hi Tao,

 my intuition is that the compaction of SST files is not triggering. By
 default, it's only triggered by the size ratios of different levels [1] and
 the TTL mechanism has no effect on it.

 Some reasoning from Stephan:

 It's very likely to have large files in higher levels that haven't been
> compacted in a long time and thus just stay around.
>
> This might be especially possible if you insert a lot in the beginning
> (build up many levels) and then have a moderate rate of modifications, so
> the changes and expiration keep happening purely in the merges /
> compactions of the first levels. Then the later levels may stay unchanged
> for quite some time.
>

 You should be able to see compaction details by setting RocksDB logging
 to INFO [2]. Can you please check these and validate whether this really is
 the case?

 [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
 [2]
 https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting

 Best,
 D.

 On Mon, Sep 13, 2021 at 3:18 PM tao xiao  wrote:

> Hi team
>
> We have a job that uses value state with RocksDB and TTL set to 1 day.
> The TTL update type is OnCreateAndWrite. We set the value state when the
> value state doesn't exist and we never update it again after the state is
> not empty. The key of the value state is timestamp. My understanding of
> such TTL settings is that the size of all SST files remains flat (let's
> disregard the impact space amplification brings) after 1 day as the daily
> data volume is more or less the same. However the RocksDB native metrics
> show that the SST files continue to grow since I started the job. I check
> the SST files in local storage and I can see SST files with age 1 months
> ago (when I started the job). What is the possible reason for the SST 
> files
> not cleaned up?.
>
> The Flink version is 1.12.1
> State backend is RocksDB with incremental checkpoint
> All default configuration for RocksDB
> Per job mode in Yarn and checkpoint to S3
>
>
> Here is the code to set value state
>
> public void open(Configuration parameters) {
> StateTtlConfig ttlConfigClick = StateTtlConfig
> .newBuilder(Time.days(1))
> .setUpdateType(S

Re: Built-in functions to manipulate MULTISET type

2021-09-17 Thread JING ZHANG
Hi Kai,
AFAIK, there is no built-in function to extract the keys in MULTISET

to
be an ARRAY. Define a UTF is a good solution.

Best,
JING ZHANG

Kai Fu  于2021年9月18日周六 上午7:35写道:

> Hi team,
>
> We want to know if there is any built-in function to extract the keys in
> MULTISET
> 
> to be an ARRAY. There is no such function as far as we can find, except to
> define a simple wrapper UDF for that, please advise.
>
> --
> *Best wishes,*
> *- Kai*
>


Re: Invalid flink-config keeps going and ignores bad config values

2021-09-17 Thread Yangze Guo
AFAIK there is not. Flink will just skip the invalid lines.

Best,
Yangze Guo

On Sat, Sep 18, 2021 at 7:00 AM Dan Hill  wrote:
>
> Hi.  I noticed my flink-config.yaml had an error in it.  I assumed a bad 
> config would stop Flink from running (to catch errors earlier).  Is there a 
> way I can enable a strict parsing mode so any Flink parsing issue causes 
> Flink to fail?  I don't see one when looking at the code.
>
> 2021-09-17 22:45:18,893 WARN  
> org.apache.flink.configuration.GlobalConfiguration   [] - Error while 
> trying to split key and value in configuration file 
> /opt/flink/conf/flink-conf.yaml:29: "
> taskmanager.memory.network.fraction=0.2"
>
> 2021-09-17 22:45:18,894 WARN  
> org.apache.flink.configuration.GlobalConfiguration   [] - Error while 
> trying to split key and value in configuration file 
> /opt/flink/conf/flink-conf.yaml:30: "taskmanager.memory.network.max=2g"


Built-in functions to manipulate MULTISET type

2021-09-17 Thread Kai Fu
Hi team,

We want to know if there is any built-in function to extract the keys in
MULTISET

to be an ARRAY. There is no such function as far as we can find, except to
define a simple wrapper UDF for that, please advise.

-- 
*Best wishes,*
*- Kai*


Invalid flink-config keeps going and ignores bad config values

2021-09-17 Thread Dan Hill
Hi.  I noticed my flink-config.yaml had an error in it.  I assumed a bad
config would stop Flink from running (to catch errors earlier).  Is there a
way I can enable a strict parsing mode so any Flink parsing issue
causes Flink to fail?  I don't see one when looking at the code.

2021-09-17 22:45:18,893 WARN
org.apache.flink.configuration.GlobalConfiguration
  [] - Error while trying to split key and value in configuration
file /opt/flink/conf/flink-conf.yaml:29: "taskmanager.memory.network
.fraction=0.2"

2021-09-17 22:45:18,894 WARN
org.apache.flink.configuration.GlobalConfiguration
  [] - Error while trying to split key and value in configuration
file /opt/flink/conf/flink-conf.yaml:30: "taskmanager.memory.network
.max=2g"


RE: hdfs lease issues on flink retry

2021-09-17 Thread Shah, Siddharth
Hi Matthias,

Thanks for looking into the issue and creating a ticket. I am thinking of 
having a workaround until the issue is fixed.

What if I create the attempt directories with a random int by patching 
HadoopOutputFormatBase’s open() method?

Original:

TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt___r_"
  + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + 
"s", " ").replace(" ", "0")
  + Integer.toString(taskNumber + 1)
  + "_0");


Patched:

int attemptRandomPrefix = new Random().nextInt(999);

TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__"
  + String.format("%" + (4 - 
Integer.toString(attemptRandomPrefix).length()) + "s", " ").replace(" ", "0")
  + Integer.toString(attemptRandomPrefix) + "_r_"
  + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + 
"s", " ").replace(" ", "0")
  + Integer.toString(taskNumber + 1)
  + "_0");


So basically I am creating a directory named attempt__0123_r_0001_0 instead of 
attempt___r_0001_0. I have tested on a handful of our jobs and seems to  be 
working fine. Just wanted to check any downside of this changes that I may not 
be aware of?

Thanks,
Siddharth



From: Matthias Pohl 
Sent: Tuesday, September 07, 2021 5:06 AM
To: Shah, Siddharth [Engineering] 
Cc: user@flink.apache.org; Hailu, Andreas [Engineering] 

Subject: Re: hdfs lease issues on flink retry

Just for documentation purposes: I created FLINK-24147 [1] to cover this issue.

[1] 
https://issues.apache.org/jira/browse/FLINK-24147

On Thu, Aug 26, 2021 at 6:14 PM Matthias Pohl 
mailto:matth...@ververica.com>> wrote:
I see - I should have checked my mailbox before answering. I received the email 
and was able to login.

On Thu, Aug 26, 2021 at 6:12 PM Matthias Pohl 
mailto:matth...@ververica.com>> wrote:
The link doesn't work, i.e. I'm redirected to a login page. It would be also 
good to include the Flink logs and make them accessible for everyone. This way 
others could share their perspective as well...

On Thu, Aug 26, 2021 at 5:40 PM Shah, Siddharth [Engineering] 
mailto:siddharth.x.s...@gs.com>> wrote:
Hi Matthias,

Thank you for responding and taking time to look at the issue.

Uploaded the yarn lags here: 
https://lockbox.gs.com/lockbox/folders/963b0f29-85ad-4580-b420-8c66d9c07a84/ 
and have also requested read permissions for you. Please let us know if you’re 
not able to see the files.


From: Matthias Pohl mailto:matth...@ververica.com>>
Sent: Thursday, August 26, 2021 9:47 AM
To: Shah, Siddharth [Engineering] 
mailto:siddharth.x.s...@ny.email.gs.com>>
Cc: user@flink.apache.org; Hailu, Andreas 
[Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>
Subject: Re: hdfs lease issues on flink retry

Hi Siddharth,
thanks for reaching out to the community. This might be a bug. Could you share 
your Flink and YARN logs? This way we could get a better understanding of 
what's going on.

Best,
Matthias

On Tue, Aug 24, 2021 at 10:19 PM Shah, Siddharth [Engineering] 
mailto:siddharth.x.s...@gs.com>> wrote:
Hi  Team,

We are seeing transient failures in the jobs mostly requiring higher resources 
and using flink 
RestartStrategies
 [1]. Upon checking the yarn logs we have observed hdfs lease issues when flink 
retry happens. The job originally fails for the first try with 
PartitionNotFoundException or NoResourceAvailableException., but on retry it 
seems form the yarn logs is that the lease for the temp sink directory is not 
yet released by the node from previous try.

Initial Failure log message:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate enough slots to run the job. Please make sure that the 
cluster has enough resources.
at 
org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:461)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apach

Re: Building a flink connector

2021-09-17 Thread Martijn Visser
Hi Lars,

We're actually working on creating a guide to help our users on how to
create a connector. Perhaps it would be good to use your needs to see what
we need to include in such a guide, so we can make that available to the
community. Would be great if we can have a conversation on that topic.

Best regards,

Martijn

On Fri, 17 Sept 2021 at 17:08, Yuval Itzchakov  wrote:

> Hi Lars,
>
> We've built a custom connector for Snowflake (both source and sink).
>
> Feel free to reach out in private if you have any questions.
>
> On Fri, Sep 17, 2021, 14:33 Lars Skjærven  wrote:
>
>> We're in need of a Google Bigtable flink connector. Do you have any tips
>> on how this could be done, e.g. general guidelines on how to write a
>> connector ?
>>
>> Thanks,
>> Lars
>>
>


Re: Building a flink connector

2021-09-17 Thread Yuval Itzchakov
Hi Lars,

We've built a custom connector for Snowflake (both source and sink).

Feel free to reach out in private if you have any questions.

On Fri, Sep 17, 2021, 14:33 Lars Skjærven  wrote:

> We're in need of a Google Bigtable flink connector. Do you have any tips
> on how this could be done, e.g. general guidelines on how to write a
> connector ?
>
> Thanks,
> Lars
>


Re: Cleaning old incremental checkpoint files

2021-09-17 Thread Yun Tang
Hi Robin,

You could use Checkpoints#loadCheckpointMetadata[1] to analysis the checkpoint 
meta data.

For the problem of make checkpoint self-contained, you might be interested in 
the ticket [2]


[1] 
https://github.com/apache/flink/blob/8debdd06be0e917610c50a77893f7ade45cee98f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99
[2] https://issues.apache.org/jira/browse/FLINK-24149

Best
Yun Tang

From: Robin Cassan 
Sent: Tuesday, September 7, 2021 20:17
To: Yun Tang 
Cc: Robert Metzger ; user 
Subject: Re: Cleaning old incremental checkpoint files

Hey Yun, thanks for the answer!

How would you analyze the checkpoint metadata? Would you build a program with 
the State Processor API library, or is there a better way to do it?
I believe the option you mention would indeed facilitate cleaning, it would 
still be manual (because we can't set a periodic deletion) but at least we can 
safely remove old folders with this option

Thanks,
Robin

Le ven. 3 sept. 2021 à 18:21, Yun Tang 
mailto:myas...@live.com>> a écrit :
Hi Robin,

It's not easy to clean incremental checkpoints as different job instances have 
different checkpoint sub-directory (due to different job id). You could 
analysis your checkpoint metadata to see what files are still useful in older 
checkpoint directory.

BTW, I also think of a possible solution to provide the ability to re-upload 
all files under some specific configured option so that we could let new job 
get decoupled with older checkpoints. Do you think that could resolve your case?

Best
Yun Tang

From: Robin Cassan 
mailto:robin.cas...@contentsquare.com>>
Sent: Wednesday, September 1, 2021 17:38
To: Robert Metzger mailto:rmetz...@apache.org>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Cleaning old incremental checkpoint files

Thanks Robert for your answer, this seems to be what we observed when we tried 
to delete the first time: Flink complained about missing files.
I'm wondering then how are people cleaning their storage for incremental 
checkpoints? Is there any guarantee when using TTLs that after the TTL has 
expired, no more file older than the TTL will be needed in the shared folder?

Le mar. 3 août 2021 à 13:29, Robert Metzger 
mailto:rmetz...@apache.org>> a écrit :
Hi Robin,

Let's say you have two checkpoints #1 and #2, where #1 has been created by an 
old version or your job, and #2 has been created by the new version.
When can you delete #1?
In #1, there's a directory "/shared" that contains data that is also used by 
#2, because of the incremental nature of the checkpoints.

You can not delete the data in the /shared directory, as this data is 
potentially still in use.

I know this is only a partial answer to your question. I'll try to find out 
more details and extend my answer later.


On Thu, Jul 29, 2021 at 2:31 PM Robin Cassan 
mailto:robin.cas...@contentsquare.com>> wrote:
Hi all!

We've happily been running a Flink job in production for a year now, with the 
RocksDB state backend and incremental retained checkpointing on S3. We often 
release new versions of our jobs, which means we cancel the running one and 
submit another while restoring the previous jobId's last retained checkpoint.

This works fine, but we also need to clean old files from S3 which are starting 
to pile up. We are wondering two things:
- once the newer job has restored the older job's checkpoint, is it safe to 
delete it? Or will the newer job's checkpoints reference files from the older 
job, in which case deleting the old checkpoints might cause errors during the 
next restore?
- also, since all our state has a 7 days TTL, is it safe to set a 7 or 8 days 
retention policy on S3 which would automatically clean old files, or could we 
still need to retain files older than 7 days even with the TTL?

Don't hesitate to ask me if anything is not clear enough!

Thanks,
Robin


Re: Flink S3A failed to connect to service endpoint from IntelliJ IDE

2021-09-17 Thread Yun Gao
Hi James,

For one thing, It looks to me that we should not configure the credential in 
pom.xml, 
instead, we might introduce a core-site.xml under the classpath and configured 
like



fs.s3a.access.key




fs.s3a.secret.key



I tried with the above method and the attach program and it should work. 

Besides, from the attached exception, there seems also some network problem,
could you try to change the credential configuration and try again ? if there 
are 
still this problem, could you have a look if s3 is accessible?

Best,
Yun
 --Original Mail --
Sender:James Kim 
Send Date:Thu Sep 16 02:24:41 2021
Recipients:Flink ML 
Subject:Flink S3A failed to connect to service endpoint from IntelliJ IDE
I'm trying to write Java code on IntelliJ IDE to make use of the Table API and 
the data I'm using is going to be from a CSV file over s3a. The IDE project is 
in Maven and has a pom.xml like the following:

http://maven.apache.org/POM/4.0.0";
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
4.0.0

groupId
flink-ecs-sample
1.0-SNAPSHOT


8
8

fs.s3a.access.key
myAccessKey

fs.s3a.secret.key
mySecretKey

fs.s3a.endpoint
myEndPoint







org.apache.flink
flink-table-api-java-bridge_2.11
1.13.2
compile



org.apache.flink
flink-table-planner-blink_2.11
1.13.2
compile


org.apache.flink
flink-streaming-scala_2.11
1.13.2
compile


org.apache.flink
flink-table-common
1.13.2
compile



org.apache.flink
flink-csv
1.13.2



org.apache.flink
flink-clients_2.11
1.13.2




org.apache.flink
flink-s3-fs-hadoop
1.13.2






And my Main.java class is as the following:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

public class Main {
public static void main(String[] args) {
// create a TableEnvironment for batch or streaming execution
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

// create an input Table
TableResult tempResult = tableEnv.executeSql(
//"create temporary table ATHLETES (\n" +
"create table ATHLETES (\n" +
"name varchar,\n" +
"country varchar,\n" +
"sport varchar\n" +
") with (\n" +
"'connector' = 'filesystem',\n" +

"'path'='s3a://testbucket/james_experiment/2020_Tokyo_Olympics/Athletes.csv',\n"
 +
"'format'='csv'\n" +
")\n");

TableResult table2 = tableEnv.executeSql("select * from ATHLETES");
}

When I build and run directly from the IDE, I get an error saying a couple 
things
- INFO: Error when creating PropertyDescriptor for public final void 
org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)!
 Ignoring this property.
- com.amazonaws.SdkClientException: Failed to connect to service endpoint: 
- Caused by: java.net.NoRouteToHostException: No route to host (Host 
unreachable)
- Caused by: java.net.SocketTimeoutException: connect timed out

How can I set up a Flink directly on the IntelliJ IDE and fix this issue?

Re: RocksDB state not cleaned up

2021-09-17 Thread David Morávek
Cleaning up with timers should solve this. Both approaches have some
advantages and disadvantages though.

Timers:
- No "side effects".
- Can be set in event time. Deletes are regular tombstones that will get
compacted later on.

TTL:
- Performance. This costs literally nothing compared to an extra state for
timer + writing a tombstone marker.
- Has "side-effects", because it works in processing time. This is just
something to keep in mind eg. when bootstraping the state from historical
data. (large event time / processing time skew)

With 1.14 release, we've bumped the RocksDB version so it may be possible
to use a "periodic compaction" [1], but nobody has tried that so far. In
the meantime I think there is non real workaround because we don't expose a
way to trigger manual compaction.

I'm off to vacation until 27th and I won't be responsive during that time.
I'd like to pull Yun into the conversation as he's super familiar with the
RocksDB state backend.

[1]
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction

Best,
D.

On Fri, Sep 17, 2021 at 5:17 AM tao xiao  wrote:

> Hi David,
>
> Confirmed with RocksDB log Stephan's observation is the root cause that
> compaction doesn't clean up the high level sst files fast enough.  Do you
> think manual clean up by registering a timer is the way to go or any
> RocksDB parameter can be tuned to mitigate this issue?
>
> On Wed, Sep 15, 2021 at 12:10 AM tao xiao  wrote:
>
>> Hi David,
>>
>> If I read Stephan's comment correctly TTL doesn't work well for cases
>> where we have too many levels, like fast growing state,  as compaction
>> doesn't clean up high level SST files in time, Is this correct? If yes
>> should we register a timer with TTL time and manual clean up the state
>> (state.clear() ) when the timer fires?
>>
>> I will turn on RocksDB logging as well as compaction logging [1] to
>> verify this
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction
>>
>>
>> On Tue, Sep 14, 2021 at 5:38 PM David Morávek  wrote:
>>
>>> Hi Tao,
>>>
>>> my intuition is that the compaction of SST files is not triggering. By
>>> default, it's only triggered by the size ratios of different levels [1] and
>>> the TTL mechanism has no effect on it.
>>>
>>> Some reasoning from Stephan:
>>>
>>> It's very likely to have large files in higher levels that haven't been
 compacted in a long time and thus just stay around.

 This might be especially possible if you insert a lot in the beginning
 (build up many levels) and then have a moderate rate of modifications, so
 the changes and expiration keep happening purely in the merges /
 compactions of the first levels. Then the later levels may stay unchanged
 for quite some time.

>>>
>>> You should be able to see compaction details by setting RocksDB logging
>>> to INFO [2]. Can you please check these and validate whether this really is
>>> the case?
>>>
>>> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
>>> [2]
>>> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting
>>>
>>> Best,
>>> D.
>>>
>>> On Mon, Sep 13, 2021 at 3:18 PM tao xiao  wrote:
>>>
 Hi team

 We have a job that uses value state with RocksDB and TTL set to 1 day.
 The TTL update type is OnCreateAndWrite. We set the value state when the
 value state doesn't exist and we never update it again after the state is
 not empty. The key of the value state is timestamp. My understanding of
 such TTL settings is that the size of all SST files remains flat (let's
 disregard the impact space amplification brings) after 1 day as the daily
 data volume is more or less the same. However the RocksDB native metrics
 show that the SST files continue to grow since I started the job. I check
 the SST files in local storage and I can see SST files with age 1 months
 ago (when I started the job). What is the possible reason for the SST files
 not cleaned up?.

 The Flink version is 1.12.1
 State backend is RocksDB with incremental checkpoint
 All default configuration for RocksDB
 Per job mode in Yarn and checkpoint to S3


 Here is the code to set value state

 public void open(Configuration parameters) {
 StateTtlConfig ttlConfigClick = StateTtlConfig
 .newBuilder(Time.days(1))
 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
 
 .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
 .cleanupInRocksdbCompactFilter(300_000)
 .build();
 ValueStateDescriptor clickStateDescriptor = new 
 ValueStateDescriptor<>("click", Click.class);
 clickStateDescriptor.enableTimeToLive(ttlConfigClick);
 clickState = getRuntimeContext().getS

Exception by flink kafka

2021-09-17 Thread Ragini Manjaiah
HI,
In what scenarios we hit with *java.lang.OutOfMemoryError: Java heap space
while publishing to kafka . I hit with this exception and a resolution
added property *.setProperty("security.protocol","SSL");in the flink
application.

Later I started encountering org.apache.kafka.common.errors.TimeoutException:
Failed to update metadata after 6 ms.

The flink applications consume data from topic and processes into 3 kafka
topics. and some one throws some insights on this.


I face this expectation intermittently and the jobs terminates.

I am using FlinkKafkaProducer010 with these properties set

producerConfig.setProperty(COMPRESSION_TYPE_CONFIG, CompressionType.LZ4.
name);
producerConfig.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
producerConfig.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30
");
producerConfig.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "200"
);
producerConfig.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"52428800");
producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "900");
producerConfig.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "524288000"
);
producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "19"
);
producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "0");
producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "2147483647");
producerConfig.setProperty("security.protocol","SSL");


Building a flink connector

2021-09-17 Thread Lars Skjærven
We're in need of a Google Bigtable flink connector. Do you have any tips on
how this could be done, e.g. general guidelines on how to write a connector
?

Thanks,
Lars


Re: Re: Savepoints with bootstraping a datastream function

2021-09-17 Thread Yun Gao
Hi Rakshit,

I think FLIP-147 might still not be able to support this case, since
for bounded jobs, it supports each task exit after a checkpoint to 
commit the remaining data, but it could not ensures all the tasks
exit after the same checkpoint; for savepoint, it could not supporting
taking a savepoint after all the tasks are finished. 

We are also thinking on if we could support this case with some methods.
For now, I think we may still need the workaround method as Arvid points out,
that we coordinate all the sources to wait till a final checkpoint is completed
before exit. 

Best,
yun



 --Original Mail --
Sender:Rakshit Ramesh 
Send Date:Fri Sep 17 17:20:40 2021
Recipients:Arvid Heise 
CC:user 
Subject:Re: Savepoints with bootstraping a datastream function

Hi Arvid.
I went through the code, confluence and jira on FLIP-147.
I couldn't determine if it's possible to manually trigger a 
savepoint/checkpoint as 
I couldn't find any javadoc apis for the same.
Also, would the setting "ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH" still create a 
checkpoint
if my entire bounded job finishes before the checkpoint interval? 

On Tue, 13 Jul 2021 at 17:58, Rakshit Ramesh  
wrote:

That's great news!
Thanks.

On Tue, 13 Jul 2021 at 14:17, Arvid Heise  wrote:

The only known workaround is to provide your own source(function) that doesn't 
finish until all of the source subtasks finish and a final checkpoint is 
completed. However, coordinating the sources with the old SourceFunction 
interface requires some external tool.

FLIP-147 is targeted for 1.14 in August.

On Sat, Jul 10, 2021 at 7:46 PM Rakshit Ramesh  
wrote:

Hi Arvid,
Since I'm trying to save checkpoints for a bounded process
the checkpoint isn't being written on time since the job finishes before that 
can happen.

Looks like one major feature that would be required for this to work is 
discussed in FLIP-147
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

Is there any userland workaround for this?

Thanks!

On Thu, 8 Jul 2021 at 11:52, Rakshit Ramesh  
wrote:

Yes! I was only worried about the jobid changing and the checkpoint being 
un-referenceable. 
But since I can pass a path to the checkpoint that will not be an issue.


Thanks a lot for your suggestions!

On Thu, 8 Jul 2021 at 11:26, Arvid Heise  wrote:

Hi Rakshit,

It sounds to me as if you don't need the Savepoint API at all. You can 
(re)start all applications with the previous state (be it retained checkpoint 
or savepoint). You just need to provide the path to that in your application 
invocation [1] (every entry point has such a parameter, you might need to check 
the respective documentation if you are not using CLI). Note that although it 
only says savepoint, starting from a checkpoint is fine as well (just not 
recommended in the beginning).

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#starting-a-job-from-a-savepoint
On Thu, Jul 8, 2021 at 6:31 AM Rakshit Ramesh  
wrote:

Sorry for being a little vague there.
I want to create a Savepoint from a DataStream right before the job is finished 
or cancelled.
What you have shown in the IT case is how a datastream can be bootstrapped with 
state that is
formed formed by means of DataSet. 
My jobs are triggered by a scheduler periodically (every day) using the api and 
I would like 
to bootstrap each day's job with the state of the previous day.

But thanks for the input on the Checkpoint behaviour wrt a FINISHED state, 
I think that will work for me.

Thanks!

On Thu, 8 Jul 2021 at 02:03, Arvid Heise  wrote:

I don't quite understand your question. You use Savepoint API to create a 
savepoint with a batch job (that's why it's DataSet Transform currently). That 
savepoint can only be restored through a datastream application. Dataset 
applications cannot start from a savepoint.

So I don't understand why you see a difference between "restoring a savepoint 
to a datastream" and "create a NewSavepoint for a datastream". It's ultimately 
the same thing for me. Just to be very clear: the main purpose of Savepoint API 
is to create the initial state of a datastream application.

For your second question, yes retained checkpoints outlive the job in all 
regards. It's the users responsibility to eventually clean that up.



On Wed, Jul 7, 2021 at 6:56 PM Rakshit Ramesh  
wrote:

Yes I could understand restoring a savepoint to a datastream.
What I couldn't figure out is to create a NewSavepoint for a datastream.
What I understand is that NewSavepoints only take in Bootstrap transformation 
for Dataset Transform functions.


About the checkpoints, does 
 CheckpointConfig.ExternalizedCheckpointCleanup = RETAIN_ON_CANCELLATION 
offer the same behaviour when the job is "FINISHED" and not "CANCELLED" ?

What I'm looking for is a way to retain the state for a bounded job so that the 
state is reloaded on the next job run (through api).


Re: Savepoints with bootstraping a datastream function

2021-09-17 Thread Rakshit Ramesh
Hi Arvid.
I went through the code, confluence and jira on FLIP-147.
I couldn't determine if it's possible to manually trigger a
savepoint/checkpoint as
I couldn't find any javadoc apis for the same.
Also, would the setting "ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH" still
create a checkpoint
if my entire bounded job finishes before the checkpoint interval?

On Tue, 13 Jul 2021 at 17:58, Rakshit Ramesh 
wrote:

> That's great news!
> Thanks.
>
> On Tue, 13 Jul 2021 at 14:17, Arvid Heise  wrote:
>
>> The only known workaround is to provide your own source(function) that
>> doesn't finish until all of the source subtasks finish and a final
>> checkpoint is completed. However, coordinating the sources with the old
>> SourceFunction interface requires some external tool.
>>
>> FLIP-147 is targeted for 1.14 in August.
>>
>> On Sat, Jul 10, 2021 at 7:46 PM Rakshit Ramesh <
>> rakshit.ram...@datakaveri.org> wrote:
>>
>>> Hi Arvid,
>>> Since I'm trying to save checkpoints for a bounded process
>>> the checkpoint isn't being written on time since the job finishes before
>>> that can happen.
>>>
>>> Looks like one major feature that would be required for this to work is
>>> discussed in FLIP-147
>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>>>
>>> Is there any userland workaround for this?
>>>
>>> Thanks!
>>>
>>> On Thu, 8 Jul 2021 at 11:52, Rakshit Ramesh <
>>> rakshit.ram...@datakaveri.org> wrote:
>>>
 Yes! I was only worried about the jobid changing and the checkpoint
 being un-referenceable.
 But since I can pass a path to the checkpoint that will not be an issue.


 Thanks a lot for your suggestions!

 On Thu, 8 Jul 2021 at 11:26, Arvid Heise  wrote:

> Hi Rakshit,
>
> It sounds to me as if you don't need the Savepoint API at all. You can
> (re)start all applications with the previous state (be it retained
> checkpoint or savepoint). You just need to provide the path to that in 
> your
> application invocation [1] (every entry point has such a parameter, you
> might need to check the respective documentation if you are not using 
> CLI).
> Note that although it only says savepoint, starting from a checkpoint is
> fine as well (just not recommended in the beginning).
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#starting-a-job-from-a-savepoint
>
> On Thu, Jul 8, 2021 at 6:31 AM Rakshit Ramesh <
> rakshit.ram...@datakaveri.org> wrote:
>
>> Sorry for being a little vague there.
>> I want to create a Savepoint from a DataStream right before the job
>> is finished or cancelled.
>> What you have shown in the IT case is how a datastream can be
>> bootstrapped with state that is
>> formed formed by means of DataSet.
>> My jobs are triggered by a scheduler periodically (every day) using
>> the api and I would like
>> to bootstrap each day's job with the state of the previous day.
>>
>> But thanks for the input on the Checkpoint behaviour wrt a FINISHED
>> state,
>> I think that will work for me.
>>
>> Thanks!
>>
>> On Thu, 8 Jul 2021 at 02:03, Arvid Heise  wrote:
>>
>>> I don't quite understand your question. You use Savepoint API to
>>> create a savepoint with a batch job (that's why it's DataSet Transform
>>> currently). That savepoint can only be restored through a datastream
>>> application. Dataset applications cannot start from a savepoint.
>>>
>>> So I don't understand why you see a difference between "restoring a
>>> savepoint to a datastream" and "create a NewSavepoint for a datastream".
>>> It's ultimately the same thing for me. Just to be very clear: the main
>>> purpose of Savepoint API is to create the initial state of a datastream
>>> application.
>>>
>>> For your second question, yes retained checkpoints outlive the job
>>> in all regards. It's the users responsibility to eventually clean that 
>>> up.
>>>
>>>
>>>
>>> On Wed, Jul 7, 2021 at 6:56 PM Rakshit Ramesh <
>>> rakshit.ram...@datakaveri.org> wrote:
>>>
 Yes I could understand restoring a savepoint to a datastream.
 What I couldn't figure out is to create a NewSavepoint for a
 datastream.
 What I understand is that NewSavepoints only take in Bootstrap
 transformation for Dataset Transform functions.


 About the checkpoints, does
  CheckpointConfig.ExternalizedCheckpointCleanup =
 RETAIN_ON_CANCELLATION
 offer the same behaviour when the job is "FINISHED" and not
 "CANCELLED" ?

 What I'm looking for is a way to retain the state for a bounded job
 so that the state is reloaded on the next job run (through api).

 On Wed, 7 Jul 2021 at 14:18, Arvid Heise  wrote: