flink not able to get scheme for S3

2021-08-05 Thread tarun joshi
Hey All,

I am running flink in docker containers (image Tag
:flink:scala_2.11-java11) on EC2 and getting exception as I am trying to
submit a job through the local ./opt/flink/bin

*org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: No FileSystem for scheme "s3"*
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
FileSystem for scheme "s3"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
at
org.apache.flink.examples.java.wordcount.WordCount.printParquetData(WordCount.java:142)
at
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:83)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 8 more

This is the way I am invoking Flink Built_IN S3 plugins for the  Jobmanager
and TaskManager :









*docker run \--rm \--volume /root/:/root/ \--env
JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
\--env
ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
\--name=jobmanager \--network flink-network \--publish 8081:8081
\flink:scala_2.11-java11 jobmanager &*









*docker run \--rm \--env
JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
\--env
ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
\--name=taskmanager_0 \--network flink-network \flink:scala_2.11-java11
taskmanager &*

This is how I am defining dependencies in my pom.xml (I am working upon the
Flink-Examples project from Flink Github repo).


   
  org.apache.flink
  flink-java
  ${project.version}
  provided
   

   
  org.apache.flink
  flink-scala_${scala.binary.version}
  ${project.version}
  provided
   

   
  org.apache.flink
  flink-clients_${scala.binary.version}
  ${project.version}
  provided
   

   
  org.apache.parquet
  parquet-avro
  1.12.0
   
   
  org.apache.parquet
  parquet-column
  1.12.0
   
   
  org.apache.parquet
  parquet-hadoop
  1.12.0
   
   
  org.apache.hadoop
  hadoop-common
  3.3.1
   


I am also able to see plugins being loaded for JobManager and TaskManager :





*Linking flink-s3-fs-hadoop-1.13.1.jar to plugin directorySuccessfully
enabled flink-s3-fs-hadoop-1.13.1.jarLinking flink-s3-fs-presto-1.13.1.jar
to plugin directorySuccessfully enabled flink-s3-fs-presto-1.13.1.jar*

Let me if I am doing anything wrong.

*Thanks for the help! *


[ANNOUNCE] Apache Flink 1.13.2 released

2021-08-05 Thread Yun Tang

The Apache Flink community is very happy to announce the release of Apache 
Flink 1.13.2, which is the second bugfix release for the Apache Flink 1.13 
series.

Apache Flink® is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements for 
this bugfix release:
https://flink.apache.org/news/2021/08/06/release-1.13.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218&styleName=&projectId=12315522

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Regards,
Yun Tang


Re: Implement task local recovery on TaskManager restart for Signifyd

2021-08-05 Thread Srinivasulu Punuru
Adding Sonam

Hi Colman,

I work for the Streaming SQL team at LinkedIn with Sonam, We have this in
our backlog. We are interested in solving this as well. If you are equally
interested, We can try to collaborate and solve this problem together.

Thanks,
Srini.


On Wed, Aug 4, 2021 at 8:05 PM Colman OSullivan <
colman.osulli...@signifyd.com> wrote:

> Hello!
>
> At Signifyd we use machine learning to protect our customers from credit
> card fraud. Efficiently calculating feature values for our models based on
> historical data is one of the primary challenges we face, and we’re meeting
> it with Flink.
>
> We need our system to be highly available and quickly recover large
> per-TaskManager state, even in the event of TaskManager failure. Sadly
> judging by this thread
> ,
> task-local recovery isn’t currently supported for this, even when
> TaskManagers are guaranteed to be using the same persistent storage.
>
> That same thread also proposes a couple of solutions, the simplest of
> which is to persist the slot allocations of a TaskExecutor and use it to
> re-initialize a TaskExecutor on restart, so that it can offer its slots to
> the jobs it remembers.
>
> We’d love to have this functionality sooner rather than later, so I’d like
> to know if anyone experienced with Flink development is interested in
> implementing this on our behalf as a paid project and contributing it to
> mainline Flink?
>
> If so, please get in touch, making sure to CC nikhil.bys...@signifyd.com,
> nia.schm...@signifyd.com and myself.
>
> Thanks!
>
> Colman O'Sullivan
>
> Software Engineering Manager,
>
> Link^ Team, Signifyd.
>
> ^ The ‘f’ is silent ;)
>


Re: Table API Throws Calcite Exception CannotPlanException When Tumbling Window is Used

2021-08-05 Thread JING ZHANG
Hi Joe,
Window TVF is supported since Flink 1.13, while 1.12 does not support yet.
Please upgrade to 1.13 version, or use the old Group Window Aggregate [1]
syntax in 1.12.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows

Best,
JING ZHANG


Joseph Lorenzini  于2021年8月6日周五 上午6:15写道:

> Hi all,
>
>
>
> I am on flink 1.12.3. I am trying to get a tumbling window work with the
> table API as documented here:
>
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/#tumble
>
>
>
> I have a kafka topic as a flink source. I convert the stream into a table
> using the StreamTableEnvironment#fromDataStream method. Then, once the
> table is registered in. I attempt to execute this table api SQL:
>
>
>
> SELECT window_start, window_end, avg(state_ts)
>
> FROM TABLE(TUMBLE(TABLE lead_buffer, DESCRIPTOR(proctime), INTERVAL '1'
> MINUTES))
>
>GROUP BY window_start, window_end
>
>
>
> However, this exception is thrown:
>
>
>
> Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException:
> There are not enough rules to produce a node with desired properties:
> convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any,
> MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE],
> UpdateKindTraitDef=[NONE]
>
>
>
>
>
> Does anyone have any idea about what I might be doing wrong here?
>
>
>
> Thanks,
>
> Joe
> Privileged/Confidential Information may be contained in this message. If
> you are not the addressee indicated in this message (or responsible for
> delivery of the message to such person), you may not copy or deliver this
> message to anyone. In such case, you should destroy this message and kindly
> notify the sender by reply email. Please advise immediately if you or your
> employer does not consent to Internet email for messages of this kind.
> Opinions, conclusions and other information in this message that do not
> relate to the official business of my firm shall be understood as neither
> given nor endorsed by it.
>


Re: Best Practice of Using HashSet State

2021-08-05 Thread Yun Tang
Hi Jerome,

The type of value, list and map means that the structure of value to the 
primary key. I am not sure what the set structure you mean here, if you want to 
let the value as a set, and you can just leverage map state. As you might know, 
java actually use HashMap to implement the HashSet.

Best
Yun Tang

From: Jerome Li 
Sent: Friday, August 6, 2021 7:57
To: user@flink.apache.org 
Subject: Best Practice of Using HashSet State


Hi,



I am new to Flink and state backend. I find Flink does provide ValueState, 
ListState, and MapState. But it does not provide State object for HashSet. What 
is the best practice of storing HashSet State in Flink? Should we use 
ValueState and set the value to be HashSet? Or should we use ListState and 
implement a wrapper class for serializing and desterilizing the HashSet to List?



Any help would be appreciated!



Best,

Jerome


Re: Bloom Filter - RocksDB - LinkageError Classloading

2021-08-05 Thread Yun Tang
Hi Sandeep,

If you set the flink-statebackend-rocksdb as provided scope, it should not 
include the org.rocksdb classes, have you ever checked your application jar 
package directly just as what I described?


Best
Yun Tang

From: Sandeep khanzode 
Sent: Friday, August 6, 2021 2:04
To: Stephan Ewen 
Cc: user ; Yun Tang 
Subject: Re: Bloom Filter - RocksDB - LinkageError Classloading

Hello Stephan, Yun,

Thanks for your insights.

All I have added is this:


org.apache.flink
flink-statebackend-rocksdb_2.12
${flink.version}
provided


No other library explicitly added. I am assuming, as mentioned, is that the 
flink-dist.jar already contains the relevant classes and the App or parent 
class loader loads the Rocks DB classes. All other Flink dependencies are 
packaged as Maven - Provided.

Moving to parent-first gives the Spring Framework serialisation issues … I will 
take a look at that …

I thought it would be simpler to simply specify Bloom Filters as an option …

Maybe, I will have to remove Spring dependency …


Thanks,
Sandip



On 05-Aug-2021, at 5:55 PM, Stephan Ewen 
mailto:se...@apache.org>> wrote:

@Yun Tang

Our FRocksDB has the same java package names (org.rocksdb.). Adding 
'org.rocksdb' to parent-first patterns ensures it will be loaded only once, and 
not accidentally multiple times (as Child-first classloading does).

The RocksDB code here is a bit like Flink internal components, which we always 
force parent-first (everything that starts with "org.apache.fink.").

To use RocksDB from the application jar, I think you would need to remove the 
RocksDB state backend from the classpath (lib folder), or you get exactly the 
error reported above.

I cannot think of a downside to add RocksDB to the parent first patterns.

On Thu, Aug 5, 2021 at 10:04 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Stephan,

Since we use our own FRocksDB instead of the original RocksDB as dependency, I 
am not sure whether this problem has relationship with this. From my knowledge, 
more customers would include Flink classes within the application jar package, 
and it might cause problems if the client has different flink version with 
servers.


Best,
Yun Tang

From: Stephan Ewen mailto:se...@apache.org>>
Sent: Wednesday, August 4, 2021 19:10
To: Yun Tang mailto:myas...@live.com>>
Cc: Sandeep khanzode mailto:sand...@shiftright.ai>>; 
user mailto:user@flink.apache.org>>
Subject: Re: Bloom Filter - RocksDB - LinkageError Classloading

@Yun Tang Does it make sense to add RocksDB to the "always parent-first 
options" to avoid these kind of errors when users package apps incorrectly?
My feeling is that these packaging errors occur very frequently.


On Wed, Aug 4, 2021 at 10:41 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Sandeep,

Did you include the RocksDB classes in the application jar package? You can 
unpark your jar package to check whether them existed.
If so, since RocksDB classes are already included in the flink-dist package, 
you don't need to include them in your jar package (maybe you explicitly added 
the dependency of org.rocksdb:rocksdbjni in your pom).

Best
Yun Tang

From: Sandeep khanzode mailto:sand...@shiftright.ai>>
Sent: Wednesday, August 4, 2021 11:54
To: user mailto:user@flink.apache.org>>
Subject: Bloom Filter - RocksDB - LinkageError Classloading

Hello,

I tried to add the bloom filter functionality as mentioned here:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.html


 rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {

public DBOptions createDBOptions(DBOptions currentOptions, 
Collection handlesToClose) {
return currentOptions.setMaxOpenFiles(1024);
}

public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions, Collection handlesToClose) {
BloomFilter bloomFilter = new BloomFilter();
handlesToClose.add(bloomFilter);

return currentOptions
.setTableFormatConfig(
new 
BlockBasedTableConfig().setFilter(bloomFilter));
}
 });

This is in the main class where we setup in the StreamExecutionEnvironment …

I get ClassLoading errors due to that ...

Caused by: java.lang.LinkageError: loader constraint violation: loader 
org.apache.flink.util.ChildFirstClassLoader @1169afe1 wants to load class 
org.rocksdb.ColumnFamilyOptions. A different class with the same name was 
previously loaded by 'app'. (org.rocksdb.ColumnFamilyOptions is in unnamed 
module of loader 'app')


What is documented is to change the order to parent-first in the 
flink-conf.yaml … but then I get different issues for the basic/core Spring 
Framework classes not being serializable …

Any

Table API Throws Calcite Exception CannotPlanException When Tumbling Window is Used

2021-08-05 Thread Joseph Lorenzini




Hi all,
 
I am on flink 1.12.3. I am trying to get a tumbling window work with the table API as documented here:

 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/#tumble
 
I have a kafka topic as a flink source. I convert the stream into a table using the StreamTableEnvironment#fromDataStream method. Then, once the table is registered in. I attempt to execute this table api
 SQL:
 
SELECT window_start, window_end, avg(state_ts)  
FROM TABLE(TUMBLE(TABLE lead_buffer, DESCRIPTOR(proctime), INTERVAL '1' MINUTES))

   GROUP BY window_start, window_end
 
However, this exception is thrown:
 
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any,
 MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE]
 
 
Does anyone have any idea about what I might be doing wrong here?

 
Thanks,
Joe 

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




Best Practice of Using HashSet State

2021-08-05 Thread Jerome Li
Hi,

I am new to Flink and state backend. I find Flink does provide ValueState, 
ListState, and MapState. But it does not provide State object for HashSet. What 
is the best practice of storing HashSet State in Flink? Should we use 
ValueState and set the value to be HashSet? Or should we use ListState and 
implement a wrapper class for serializing and desterilizing the HashSet to List?

Any help would be appreciated!

Best,
Jerome


Re: Bloom Filter - RocksDB - LinkageError Classloading

2021-08-05 Thread Sandeep khanzode
Hello Stephan, Yun,

Thanks for your insights.

All I have added is this:

org.apache.flink
flink-statebackend-rocksdb_2.12
${flink.version}
provided


No other library explicitly added. I am assuming, as mentioned, is that the 
flink-dist.jar already contains the relevant classes and the App or parent 
class loader loads the Rocks DB classes. All other Flink dependencies are 
packaged as Maven - Provided.

Moving to parent-first gives the Spring Framework serialisation issues … I will 
take a look at that …

I thought it would be simpler to simply specify Bloom Filters as an option … 

Maybe, I will have to remove Spring dependency … 


Thanks,
Sandip



> On 05-Aug-2021, at 5:55 PM, Stephan Ewen  wrote:
> 
> @Yun Tang
> 
> Our FRocksDB has the same java package names (org.rocksdb.). Adding 
> 'org.rocksdb' to parent-first patterns ensures it will be loaded only once, 
> and not accidentally multiple times (as Child-first classloading does).
> 
> The RocksDB code here is a bit like Flink internal components, which we 
> always force parent-first (everything that starts with "org.apache.fink.").
> 
> To use RocksDB from the application jar, I think you would need to remove the 
> RocksDB state backend from the classpath (lib folder), or you get exactly the 
> error reported above.
> 
> I cannot think of a downside to add RocksDB to the parent first patterns.
> 
> On Thu, Aug 5, 2021 at 10:04 AM Yun Tang  > wrote:
> Hi Stephan,
> 
> Since we use our own FRocksDB instead of the original RocksDB as dependency, 
> I am not sure whether this problem has relationship with this. From my 
> knowledge, more customers would include Flink classes within the application 
> jar package, and it might cause problems if the client has different flink 
> version with servers.
> 
> 
> Best,
> Yun Tang
> From: Stephan Ewen mailto:se...@apache.org>>
> Sent: Wednesday, August 4, 2021 19:10
> To: Yun Tang mailto:myas...@live.com>>
> Cc: Sandeep khanzode mailto:sand...@shiftright.ai>>; 
> user mailto:user@flink.apache.org>>
> Subject: Re: Bloom Filter - RocksDB - LinkageError Classloading
>  
> @Yun Tang Does it make sense to add RocksDB to the "always parent-first 
> options" to avoid these kind of errors when users package apps incorrectly?
> My feeling is that these packaging errors occur very frequently.
> 
> 
> On Wed, Aug 4, 2021 at 10:41 AM Yun Tang  > wrote:
> Hi Sandeep,
> 
> Did you include the RocksDB classes in the application jar package? You can 
> unpark your jar package to check whether them existed.
> If so, since RocksDB classes are already included in the flink-dist package, 
> you don't need to include them in your jar package (maybe you explicitly 
> added the dependency of org.rocksdb:rocksdbjni in your pom).
> 
> Best
> Yun Tang
> From: Sandeep khanzode mailto:sand...@shiftright.ai>>
> Sent: Wednesday, August 4, 2021 11:54
> To: user mailto:user@flink.apache.org>>
> Subject: Bloom Filter - RocksDB - LinkageError Classloading
>  
> Hello,
> 
> I tried to add the bloom filter functionality as mentioned here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.html
>  
> 
> 
>  rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
> 
>   public DBOptions createDBOptions(DBOptions currentOptions, 
> Collection handlesToClose) {
>   return currentOptions.setMaxOpenFiles(1024);
>   }
> 
>   public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
> currentOptions, Collection handlesToClose) {
>   BloomFilter bloomFilter = new BloomFilter();
>   handlesToClose.add(bloomFilter);
> 
>   return currentOptions
>   .setTableFormatConfig(
>   new 
> BlockBasedTableConfig().setFilter(bloomFilter));
>   }
>  });
> 
> This is in the main class where we setup in the StreamExecutionEnvironment …
> 
> I get ClassLoading errors due to that ...
> Caused by: java.lang.LinkageError: loader constraint violation: loader 
> org.apache.flink.util.ChildFirstClassLoader @1169afe1 wants to load class 
> org.rocksdb.ColumnFamilyOptions. A different class with the same name was 
> previously loaded by 'app'. (org.rocksdb.ColumnFamilyOptions is in unnamed 
> module of loader 'app')
> 
> 
> What is documented is to change the order to parent-first in the 
> flink-conf.yaml … but then I get different issues for the basic/core Spring 
> Framework classes not being serializable …
> 
> Any help will be appreciated.
> 
> Thanks,
> Sandip



write into parquet with variable number of columns

2021-08-05 Thread Sharipov, Rinat
Hi mates !

I'm trying to find the best way to persist data into columnar format
(parquet) using Flink.
Each event contains a fixed list of properties and a variable list of
properties, defined by the user.
And I would  like to save user defined properties into separate columns on
the fly.

Here is an example of my events:

[
  {
"eventId": "7e2b33c8-9c00-42ed-b780-dbbd8b0f6562",
"timestamp": 123,
"attributes": {
  "gender": "male",
  "geo": "Germany"
}
  },
  {
"eventId": "7e2b33c8-9c00-42ed-b780-dbbd8b0f6562",
"timestamp": 123,
"attributes": {
  "car": "big-cool-car",
  "phone": "iphone"
}
  }
]

As a result, I would like to have a table with the following columns

*eventId | timestamp | gender | geo | car | phone*

I've looked into streaming file sink, but it requires defining a schema
before starting a job, which is not possible in my case.
Also I've remembered about *explode sql function* that can help me with a
standard sql, but it doesn't exist in the Flink Table API.

I have found that since *1.13.0 version Flink *supports creation of row by
names using *Row.withNames(), *so I guess this can be
a key that solves my problem, here is what java doc says

*Name-based field mode **withNames() creates a variable-length row. The
fields can be accessed by name using getField(String) and setField(String,
Object). Every field is initialized during the first call to
setField(String, Object) for the given name. However, the framework will
initialize missing fields with null and reorder all fields once more type
information is available during serialization or input conversion. Thus,
even name-based rows eventually become fixed-length composite types with a
deterministic field order. Name-based rows perform worse than
position-based rows but simplify row creation and code readability.*

So it seems that all I need is to transform my event into a record manually
and persist the resulting table into a file-system, but my noop demo
example fails within an exception, here it is:

public class TestApp {

  public static void main(String[] args) {

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Row row1 = Row.withNames();

row1.setField("a", "fb1");

row1.setField("b", "gmail1");
row1.setField("c", "vk1");

Row row2 = Row.withNames();
row2.setField("b", "ok2");
row2.setField("c", "gmail2");

tableEnv.fromValues(row1, row2).printSchema();

  }

}

Here is a stack trace of the exception:

*java.lang.IllegalArgumentException: Accessing a field by position is
not supported in name-based field mode.*

*   at org.apache.flink.types.Row.getField(Row.java:257)*

*   at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)*

*   at 
java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)*

*   at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)*

*   at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)*

*   at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)*

*   at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)*

*   at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)*

*   at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)*

*   at 
org.apache.flink.table.expressions.ApiExpressionUtils.convertRow(ApiExpressionUtils.java:123)*

*   at 
org.apache.flink.table.expressions.ApiExpressionUtils.objectToExpression(ApiExpressionUtils.java:103)*

*   at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)*

*   at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)*

*   at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)*

*   at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)*

*   at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)*

*   at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)*

*   at 
java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)*

*   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.fromValues(TableEnvironmentImpl.java:359)*

*   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.fromValues(TableEnvironmentImpl.java:334)*


Maybe someone has tried this feature and can guess what's wrong with
the current code and how to make it work.

Anyway I have a fallback - accumulate a butch of events, define the
schema for them and write into file system manually, but I still hope
that I can do this in more elegant way.

Thx for your advice and time !


-- 
Best Regards,
*Sharipov Rinat*

CleverDATA
make your data clever


Using POJOs with the table API

2021-08-05 Thread Alexis Sarda-Espinosa
Hi everyone,

I had been using the DataSet API until now, but since that's been deprecated, I 
started looking into the Table API. In my DataSet job I have a lot of POJOs, 
all of which are even annotated with @TypeInfo and provide the corresponding 
factories. The Table API documentation talks about POJOs here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#user-defined-data-types

I started with a single UDF to try it out (an AggregateFunction), but I have 
encountered several issues.

My accumulator class is a (serializable) POJO, but initially there were 
validation exceptions because the specified class is abstract. I added this to 
the class and got over that:

@FunctionHint(
accumulator = DataTypeHint("RAW", bridgedTo = MyAccumulator.class)
)

Then there were exceptions about the output type. Since it's also a POJO, I 
thought this would help:

@FunctionHint(
accumulator = DataTypeHint("RAW", bridgedTo = MyAccumulator::class),
output = DataTypeHint("RAW", bridgedTo =  MyDTO.class)
)

But no luck: org.apache.flink.table.api.TableException: Unsupported conversion 
from data type 'RAW('com.MyDTO', '...')' (conversion class: com.MyDTO) to type 
information. Only data types that originated from type information fully 
support a reverse conversion.

I figured I would try something simpler and first return a List from my 
AggregateFunction. But how do I define that in a DataTypeHint? I'm not sure if 
that's documented, but I looked through LogicalTypeParser and used:

output = DataTypeHint("ARRAY")

But that throws an exception (see attachment): Table program cannot be 
compiled. This is a bug. Please file an issue.

I changed the List to String[] and that finally worked.

Even getting a simple test running was difficult. I simply could not get 
TableEnvironment#fromValues to work with POJOs as input, and I tried many 
combinations of DataTypes.of(MyPojo.class)

At this point I still don't know how to return complex data structures 
encapsulated in POJOs from my UDF. Am I missing something very essential?

Regards,
Alexis.

org.apache.flink.table.api.TableException: Failed to wait job finish

at 
org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56)
at 
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
at 
org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:130)
at 
org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154)
at com.E2ETest.tablePlayground(E2ETest.kt:149)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at 
org.junit.jupit

Re: Bloom Filter - RocksDB - LinkageError Classloading

2021-08-05 Thread Stephan Ewen
@Yun Tang

Our FRocksDB has the same java package names (org.rocksdb.). Adding
'org.rocksdb' to parent-first patterns ensures it will be loaded only once,
and not accidentally multiple times (as Child-first classloading does).

The RocksDB code here is a bit like Flink internal components, which we
always force parent-first (everything that starts with "org.apache.fink.").

To use RocksDB from the application jar, I think you would need to remove
the RocksDB state backend from the classpath (lib folder), or you get
exactly the error reported above.

I cannot think of a downside to add RocksDB to the parent first patterns.

On Thu, Aug 5, 2021 at 10:04 AM Yun Tang  wrote:

> Hi Stephan,
>
> Since we use our own FRocksDB instead of the original RocksDB as
> dependency, I am not sure whether this problem has relationship with this.
> From my knowledge, more customers would include Flink classes within the
> application jar package, and it might cause problems if the client has
> different flink version with servers.
>
>
> Best,
> Yun Tang
> --
> *From:* Stephan Ewen 
> *Sent:* Wednesday, August 4, 2021 19:10
> *To:* Yun Tang 
> *Cc:* Sandeep khanzode ; user <
> user@flink.apache.org>
> *Subject:* Re: Bloom Filter - RocksDB - LinkageError Classloading
>
> @Yun Tang Does it make sense to add RocksDB to the "always parent-first
> options" to avoid these kind of errors when users package apps incorrectly?
> My feeling is that these packaging errors occur very frequently.
>
>
> On Wed, Aug 4, 2021 at 10:41 AM Yun Tang  wrote:
>
> Hi Sandeep,
>
> Did you include the RocksDB classes in the application jar package? You
> can unpark your jar package to check whether them existed.
> If so, since RocksDB classes are already included in the flink-dist
> package, you don't need to include them in your jar package (maybe you
> explicitly added the dependency of org.rocksdb:rocksdbjni in your pom).
>
> Best
> Yun Tang
> --
> *From:* Sandeep khanzode 
> *Sent:* Wednesday, August 4, 2021 11:54
> *To:* user 
> *Subject:* Bloom Filter - RocksDB - LinkageError Classloading
>
> Hello,
>
> I tried to add the bloom filter functionality as mentioned here:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.html
>
>  rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
>
>   public DBOptions createDBOptions(DBOptions currentOptions, 
> Collection handlesToClose) {
>   return currentOptions.setMaxOpenFiles(1024);
>   }
>
>   public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
> currentOptions, Collection handlesToClose) {
>   BloomFilter bloomFilter = new BloomFilter();
>   handlesToClose.add(bloomFilter);
>
>   return currentOptions
>   .setTableFormatConfig(
>   new 
> BlockBasedTableConfig().setFilter(bloomFilter));
>   }
>  });
>
>
> This is in the main class where we setup in the StreamExecutionEnvironment
> …
>
> I get ClassLoading errors due to that ...
>
> Caused by: java.lang.LinkageError: loader constraint violation: loader 
> org.apache.flink.util.ChildFirstClassLoader @1169afe1 wants to load class 
> org.rocksdb.ColumnFamilyOptions. A different class with the same name was 
> previously loaded by 'app'. (org.rocksdb.ColumnFamilyOptions is in unnamed 
> module of loader 'app')
>
>
>
> What is documented is to change the order to parent-first in the
> flink-conf.yaml … but then I get different issues for the basic/core Spring
> Framework classes not being serializable …
>
> Any help will be appreciated.
>
> Thanks,
> Sandip
>
>


Re:Is FlinkKafkaConsumer setStartFromLatest() method needed when we use auto.offset.reset=latest kafka properties

2021-08-05 Thread 纳兰清风



Hi suman,


FlinkKafkaConsumer.setStartFromLatest() means you always consume messages 
from the latest whenever you restart the flink job,the consumer ignore any 
committed group offsets.
auto.offset.reset=latest  means the consumer fetch messages from the latest 
if you never committed any offsets before, it is a default strategy if the 
consumer does not found any offsets from brokers.










At 2021-08-05 06:35:05, "suman shil"  wrote:

In my flink streaming application I have kafka datasource. I am using the kafka 
property auto.offset.reset=latest. I am wondering if I need to use 
FlinkKafkaConsumer.setStartFromLatest(). Are they similar? Can I use either of 
them? Following is the documentation from flink code. 
/**
 * Specifies the consumer to start reading from the latest offset for all 
partitions. This lets
 * the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
 *
 * This method does not affect where partitions are read from when the 
consumer is restored
 * from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or savepoint,
 * only the offsets in the restored state will be used.
 *
 * @return The consumer object, to allow function chaining.
 */
public FlinkKafkaConsumerBase setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
this.startupOffsetsTimestamp = null;
this.specificStartupOffsets = null;
return this;
}



Thanks

Re: Flink k8 HA mode + checkpoint management

2021-08-05 Thread Yang Wang
FLINK-19358[1] might be related and we already have some discussion there.

[1]. https://issues.apache.org/jira/browse/FLINK-19358

Best,
Yang

Yun Tang  于2021年8月4日周三 上午11:50写道:

> Hi Harsh,
>
> The job id would be fixed as  if using HA
> mode with native k8s, which means the checkpoint path should stay the same
> no matter how many times you submit.
> However, if HA mode is enabled, the new job would first recover from the
> HA checkpoint store to recover the last checkpoint. In other words, your
> new job should recover from last checkpoint-1. From your exceptions, we can
> see the job did not recover successfully and start the job from scratch.
> That's why you could meet the exception that checkpoint-meta file has been
> existed.
>
> There would be two reasons for this:
>
>1. The HA checkpoint store did not recover successfully, you could
>check whether the checkpoint 1 is completed in the previous run.
>2. The last checkpoint-1 finished to store on the remote checkpoint
>path but fail to add to the checkpoint store. However, the checkpoint
>coordinator would clean up the checkpoint meta if failed to add to
>checkpoint store [1] unless your job crashed or meet the
>PossibleInconsistentStateException [2].
>
> I think you should check the jobmanager log of your last run to know the
> root cause.
>
> [1]
> https://github.com/apache/flink/blob/46bf6d68ee97684949ba3ad38dc18ff7c800092a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1233
> [2]
> https://github.com/apache/flink/blob/46bf6d68ee97684949ba3ad38dc18ff7c800092a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1226
>
>
> Best
> Yun Tang
> --
> *From:* Manong Karl 
> *Sent:* Wednesday, August 4, 2021 9:17
> *To:* Harsh Shah 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Flink k8 HA mode + checkpoint management
>
> Can You please share your configs? I'm using native kubernetes without HA
> and there's no issues. I'm curious how this happens.  AFAIK jobid is
> generated randomly.
>
>
> Harsh Shah  于2021年8月4日周三 上午2:44写道:
>
> Hello,
>
> I am trying to use Flink HA mode inside kubernetes
> 
>  in standalone
> 
>  mode.
> The Job ID is always constant, "". In
> situation where we restart the job (Not from a check-point or savepoint),
> we see errors like
> """
>
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> '/flink-checkpoints//chk-1/_metadata' 
> already exists
>
> """
> where checkpoints have not been created since the restart of Job .
>
> My question:
> * Is the recommended way to set a new unique "checkpoint path" every time
> we update Job and restart necessary k8 resources (say not restarted from
> checkpoint-savepoint)? Or GC checkpoints during deletion and reload from
> savepoint if required? Looking for a standard recommendation.
> * Is there a way I can override the JobID to be unique and indicate it is
> a complete restart in HA mode?
>
>
> Thanks,
> Harsh
>
>


Re: Avro SpecificRecordBase question

2021-08-05 Thread Flavio Pompermaier
Hi Kirill, as far as I know SpecificRecordBase should work in Flink, I
don't know if there's any limitation in StateFun.
It seems that the typeClass passed to the generateFieldsFromAvroSchema from
the PravegaDeserializationSchema..
Maybe the pravega.LoadsSource does not bind correctly the Avro classes and
their schema (i.e. a serializer factory)?

Best,
Flavio

On Thu, Aug 5, 2021 at 2:06 AM Kirill Kosenko <
kirill.kose...@transportexchangegroup.com> wrote:

> Hello
>
> I read in this article
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
> that it's possible to use SpecificRecordBase.class in the operators:
>
> Avro Specific
> Avro specific records will be automatically detected by checking that the
> given type’s type hierarchy contains the SpecificRecordBase class. You can
> either specify your concrete Avro type, or—if you want to be more generic
> and allow different types in your operator—use the SpecificRecordBase type
> (or a subtype) in your user functions, in
> ResultTypeQueryable#getProducedType(), or in
> SingleOutputStreamOperator#returns(). Since specific records use generated
> Java code, they are strongly typed and allow direct access to the fields
> via known getters and setters.
>
> If I specify SpecificRecordBase.class in the SourceFunction/SinkFunction I
> get an exception:
>
> Caused by: java.lang.IllegalStateException: Expecting type to be a 
> PojoTypeInfo
> at 
> org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema(AvroTypeInfo.java:71)
>  ~[?:?]
> at 
> org.apache.flink.formats.avro.typeutils.AvroTypeInfo.(AvroTypeInfo.java:55)
>  ~[?:?]
> at 
> org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.createAvroTypeInfo(AvroKryoSerializerUtils.java:81)
>  ~[?:?]
> at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1897)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1798)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:970)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:799)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:746)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:742)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:210)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> io.pravega.connectors.flink.serialization.PravegaDeserializationSchema.(PravegaDeserializationSchema.java:64)
>  ~[?:?]
> at 
> org.apache.flink.statefun.pravega.LoadsSource.createLoadsSource(LoadsSource.java:41)
>  ~[?:?]
> at 
> org.apache.flink.statefun.pravega.EmbeddedModule.configure(EmbeddedModule.java:55)
>  ~[?:?]
> at 
> org.apache.flink.statefun.flink.core.spi.Modules.createStatefulFunctionsUniverse(Modules.java:70)
>  ~[statefun-flink-core.jar:3.0.0]
> at 
> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses$ClassPathUniverseProvider.get(StatefulFunctionsUniverses.java:43)
>  ~[statefun-flink-core.jar:3.0.0]
> at 
> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses.get(StatefulFunctionsUniverses.java:32)
>  ~[statefun-flink-core.jar:3.0.0]
> at 
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:71)
>  ~[statefun-flink-core.jar:3.0.0]
> at 
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:52)
>  ~[statefun-flink-core.jar:3.0.0]
>
>
> Did I miss something?
>
> Is it possible to use SpecificRecordBase.class as a
> SourceFunction/SinkFunction type?
>
> Hope it was clear
>
> Thanks
>
> --
> Best regards,
> Kirill Kosenko
>


Re: Bloom Filter - RocksDB - LinkageError Classloading

2021-08-05 Thread Yun Tang
Hi Stephan,

Since we use our own FRocksDB instead of the original RocksDB as dependency, I 
am not sure whether this problem has relationship with this. From my knowledge, 
more customers would include Flink classes within the application jar package, 
and it might cause problems if the client has different flink version with 
servers.


Best,
Yun Tang

From: Stephan Ewen 
Sent: Wednesday, August 4, 2021 19:10
To: Yun Tang 
Cc: Sandeep khanzode ; user 
Subject: Re: Bloom Filter - RocksDB - LinkageError Classloading

@Yun Tang Does it make sense to add RocksDB to the "always parent-first 
options" to avoid these kind of errors when users package apps incorrectly?
My feeling is that these packaging errors occur very frequently.


On Wed, Aug 4, 2021 at 10:41 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Sandeep,

Did you include the RocksDB classes in the application jar package? You can 
unpark your jar package to check whether them existed.
If so, since RocksDB classes are already included in the flink-dist package, 
you don't need to include them in your jar package (maybe you explicitly added 
the dependency of org.rocksdb:rocksdbjni in your pom).

Best
Yun Tang

From: Sandeep khanzode mailto:sand...@shiftright.ai>>
Sent: Wednesday, August 4, 2021 11:54
To: user mailto:user@flink.apache.org>>
Subject: Bloom Filter - RocksDB - LinkageError Classloading

Hello,

I tried to add the bloom filter functionality as mentioned here:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.html


 rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {

public DBOptions createDBOptions(DBOptions currentOptions, 
Collection handlesToClose) {
return currentOptions.setMaxOpenFiles(1024);
}

public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions, Collection handlesToClose) {
BloomFilter bloomFilter = new BloomFilter();
handlesToClose.add(bloomFilter);

return currentOptions
.setTableFormatConfig(
new 
BlockBasedTableConfig().setFilter(bloomFilter));
}
 });

This is in the main class where we setup in the StreamExecutionEnvironment …

I get ClassLoading errors due to that ...

Caused by: java.lang.LinkageError: loader constraint violation: loader 
org.apache.flink.util.ChildFirstClassLoader @1169afe1 wants to load class 
org.rocksdb.ColumnFamilyOptions. A different class with the same name was 
previously loaded by 'app'. (org.rocksdb.ColumnFamilyOptions is in unnamed 
module of loader 'app')


What is documented is to change the order to parent-first in the 
flink-conf.yaml … but then I get different issues for the basic/core Spring 
Framework classes not being serializable …

Any help will be appreciated.

Thanks,
Sandip


Re: [ANNOUNCE] RocksDB Version Upgrade and Performance

2021-08-05 Thread Yun Tang
Hi Piotr,


  1.  Can we push for better benchmark coverage in the RocksDB project in the 
future?
  2.  Sure, I think we could contribute what we did in flink-benchmarks to 
improve their JMH benchmark [1]. And I will ask them how often will they run 
the benchmark.

  1.  Can we try to catch this kind of problems with RocksDB earlier? For 
example with more frequent RocksDB upgrades, or building test flink builds with 
the most recent RocksDB version to run our benchmarks and validate newer 
RocksDB versions?
  2.  I think this advice could make sense. Apart from releasing our own 
FRocksDB every time, which would take several days to build and distribute 
across maven repos. I prefer to use pre-built official RocksDB jar package and 
we need to maintain a Flink branch which does not contain the FRocksDB TTL 
feature so that the official RocksDB jar package could run directly.

[1] https://github.com/facebook/rocksdb/tree/master/java/jmh

Best,
Yun Tang

From: Piotr Nowojski 
Sent: Thursday, August 5, 2021 2:01
To: Yuval Itzchakov 
Cc: Yun Tang ; Nico Kruber ; 
user@flink.apache.org ; dev 
Subject: Re: [ANNOUNCE] RocksDB Version Upgrade and Performance

Thanks for the detailed explanation Yun Tang and clearly all of the effort you 
have put into it. Based on what was described here I would also vote for going 
forward with the upgrade.

It's a pity that this regression wasn't caught in the RocksDB community. I 
would have two questions/ideas:
1. Can we push for better benchmark coverage in the RocksDB project in the 
future?
2. Can we try to catch this kind of problems with RocksDB earlier? For example 
with more frequent RocksDB upgrades, or building test flink builds with the 
most recent RocksDB version to run our benchmarks and validate newer RocksDB 
versions?

Best,
Piotrek

śr., 4 sie 2021 o 19:59 Yuval Itzchakov 
mailto:yuva...@gmail.com>> napisał(a):
Hi Yun,
Thank you for the elaborate explanation and even more so for the super hard 
work that you're doing digging into RocksDB and chasing after hundreds of 
commits in order to fix them so we can all benefit.

I can say for myself that optimizing towards memory is more important ATM for 
us, and I'm totally +1 for this.

On Wed, Aug 4, 2021 at 8:50 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Yuval,

Upgrading RocksDB version is a long story since Flink-1.10.
When we first plan to introduce write buffer manager to help control the memory 
usage of RocksDB, we actually wanted to bump up to RocksDB-5.18 from current 
RocksDB-5.17. However, we found performance regression in our micro benchmark 
on state operations [1] if bumped to RocksDB-5.18. We did not figure the root 
cause at that time and decide to cherry pick the commits of write buffer 
manager to our own FRocksDB [2]. And we finally released our own 
frocksdbjni-5.17.2-artisans-2.0 at that time.

As time goes no, more and more bugs or missed features have been reported in 
the old RocksDB version. Such as:

  1.  Cannot support ARM platform [3]
  2.  Dose not have stable deleteRange API, which is useful for Flink scale out 
[4]
  3.  Cannot support strict block cache [5]
  4.  Checkpoint might stuck if using UNIVERSVAL compaction strategy [6]
  5.  Uncontrolled log size make us disabled the RocksDB internal LOG [7]
  6.  RocksDB's optimizeForPointLookup option might cause data lost [8]
  7.  Current dummy entry used for memory control in RocksDB-5.17 is too large, 
leading performance problem [9]
  8.  Cannot support alpine-based images.
  9.  ...

Some of the bugs are walked around, and some are still open.

And we decide to make some changes from Flink-1.12. First of all, we reported 
the performance regression problem compared with RocksDB-5.18 and RocksDB-5.17 
to RocksDB community [10]. However, as RocksDB-5.x versions are a bit older for 
the community, and RocksJava usage might not be the core part for facebook 
guys, we did not get useful replies. Thus, we decide to figure out the root 
cause of performance regression by ourself.
Fortunately, we find the cause via binary search the commits among RocksDB-5.18 
and RocksDB-5.17, and updated in the original thread [10]. To be short, the 
performance regression is due to different implementation of `__thread` and 
`thread_local` in gcc and would have more impact on dynamic loading [11], which 
is also what current RocksJava jar package does. With my patch [12], the 
performance regression would disappear if comparing RocksDB-5.18 with 
RocksDB-5.17.

Unfortunately, RocksDB-5.18 still has many bugs and we want to bump to 
RocksDB-6.x. However, another performance regression appeared even with my 
patch [12]. With previous knowledge, we know that we must verify the built .so 
files with our java-based benchmark instead of using RocksDB built-in db-bench. 
I started to search the 1340+ commits from RocksDB-5.18 to RocksDB-6.11 to find 
the performance problem. However, I did not figure out the root