Re: Unit / Integration tests for Table API ?

2020-05-15 Thread Jark Wu
Hi,

Flink Table&SQL has a testing suite to do integration test.
You can have a look at
`org.apache.flink.table.planner.runtime.stream.sql.AggregateITCase`.
We are using a bounded source stream and a testing sink to collect result
and verify the result.
You need to depend on the following dependency to access the testing utils.

  
org.apache.flink
flink-table-planner_${scala.binary.version}
${project.version}
test-jar
test



Best,
Jark

On Fri, 15 May 2020 at 23:09, Darlo Bobo  wrote:

> Hello,
>
> I have built a small Flink app which receive events (json events),
> deserialize them to an object and then uses the Table API to create two
> tables, do some join and then write the results back to a kafka stream.
>
> What is the suggested method to correctly test that the code written with
> the Table API works as expected? Any best practice for ITs ?
> Is the best way to simply run the app through flink and then test it with
> some external tools such a small python script?
>
> (Using Flink 1.8.3)
>
> Thank you,
>
> Darlo
>
>
>
>


Re: Help with table-factory for SQL

2020-05-15 Thread Jark Wu
Hi,

Could you share the SQL DDL and the full exception message? It might be you
are using the wrong `connector.version` or other options.

Best,
Jark

On Fri, 15 May 2020 at 20:14, Martin Frank Hansen 
wrote:

>   Hi,
>
> I am trying to connect to kafka through flink, but having some difficulty
> getting the right table-factory-source.
>
> I currently get the error: NoMatchingTableFactoryException: Could not
> find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. my
> sbt file looks like this:
>
> name := "writeToSQL"
>
> version := "0.1"
>
> scalaVersion := "2.11.12"
> val flinkVersion = "1.9.1"
> val hadoopVersion = "3.0.0"
>
> libraryDependencies ++= Seq(
>
>   "org.slf4j" % "slf4j-api" % "1.7.15" % "runtime",
>   "org.apache.flink" %% "flink-connector-kafka" % flinkVersion % "compile",
>   "org.apache.flink" %% "flink-sql-connector-kafka" % flinkVersion % 
> "compile",
>   "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
>   "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion %  
> "provided",
>   "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion %  
> "provided",
>   "org.apache.flink" %% "flink-table-planner" % flinkVersion %  "provided",
>   "org.apache.flink" % "flink-table-common" % flinkVersion %  "provided",
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
>   "org.apache.flink" % "flink-table" % flinkVersion % "compile",
>   "org.apache.flink" % "flink-json" % flinkVersion % "compile",
>   "org.slf4j" % "slf4j-log4j12" % "1.7.25" % "runtime"
> )
>
> assemblyMergeStrategy in assembly := {
>   case path if path.contains("META-INF/services") => MergeStrategy.concat
>   case PathList("META-INF", _*) => MergeStrategy.discard
>   case _ => MergeStrategy.first
> }
>
>
> From the documentation
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#define-a-tablefactory
>  I
> can see what is missing, but I do not know how to solve it.
>
> The documentation says the following:
> Define a TableFactory
> 
>
> A TableFactory allows to create different table-related instances from
> string-based properties. All available factories are called for matching to
> the given set of properties and a corresponding factory class.
>
> Factories leverage Java’s Service Provider Interfaces (SPI)
>  for
> discovering. This means that every dependency and JAR file should contain a
> file org.apache.flink.table.factories.TableFactory in the
> META_INF/services resource directory that lists all available table
> factories that it provides.
>
> But how do I do that? I thought the sbt-file would take care of this.
>
> Any help is highly appreciated!
>
> Best regards
>
> Martin Frank Hansen
>
>
>


Re: Watermarks and parallelism

2020-05-15 Thread Gnanasoundari Soundarajan
Thanks Alexander for your detailed response.

I have a requirement that each asset will communicate different event time due 
to connectivity issues. If I have 50 asset and each communicates with different 
event time, I should not lose the data because of lateness.

To handle this, I have tried with keyBy operator to route the data by asset 
context and try to maintain watermark per asset (key) using keyedProcess 
function by registering eventtime timer for each asset (key).  When I have 
tried this option, I observed that eventtime timer is not triggered 
keyedProcess function and hence data didn’t flow downstream.

I am curious to know that whether will it be a feasible requirement to achieve 
it in flink using event time?

Regards,
Gnana

From: Alexander Fedulov 
Date: Thursday, 14 May 2020 at 9:25 PM
To: Gnanasoundari Soundarajan 
Cc: "user@flink.apache.org" 
Subject: Re: Watermarks and parallelism

Hi Gnana,

1. No, watermarks are generated independently per subtask. I think this section 
of the docs might make things more clear - 
[1]
 .

2. The same watermark from the input of the keyBy will be dispatched to all of 
the instances of the downstream keyed operator. That said, there is no global 
coordination between the subtasks. The same watermark can arrive at the 
downstream subtask at a different time, depending on how much time they'd spend 
on the input channels. Notice also that watermarks are managed on the subtask 
level, not at the level of the individual keys.

3. I am not quite sure I get what you mean by this one and what exactly you try 
to achieve. I assume you want to basically have parallel windows that are 
scoped to all of the items coming from a corresponding subtask of the previous 
non-keyed operator. As Flink windows can be executed in parallel only on keyed 
streams, you could  do a little trick - use `reinterpredAsKeyedStream` 
[2].
 This will make it possible to basically have a "passthrough" partitioning, 
without an actual data shuffle. Another alternative would be to implement your 
Map function as a RichMapFunction, which gives you the access to the runtime 
context. From there:
1) use `getRuntimeContext().getIndexOfThisSubtask();` to retrieve the ID of the 
current subtask
2) enrich your events with a new field, containing the subtask ID
3) use this ID as the key in your keyBy operator
The problem is that both of those approaches will be non-deterministic in terms 
of state recovery when, for instance, you would like to scale out your job to a 
higher degree of parallelism. You'd need to decide if this is relevant for your 
use case.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/concepts/timely-stream-processing.html#watermarks-in-parallel-streams
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream

Best,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796


[Image removed by sender.]


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) 
Cheng



On Thu, May 14, 2020 at 6:14 AM Gnanasoundari Soundarajan 
mailto:gnanasoundari.soundara...@man-es.com>>
 wrote:
Hi all,

I have below queries in flink. Could anyone help me to understand?

Query:

1 Is watermark maintained  globally at the operator level?

2 When we have a keyByOperator with parallelism >1, is there a single watermark 
maintained across all the parallel subtasks or for each of the parallel subtasks

3. Assuming I have a keybyoperator with parallelism > 1, is it possible to feed 
data to this operator from only one stream from the previous parameter (say map 
(1) always goes to window (1)

Regards,
Gnana


Unit / Integration tests for Table API ?

2020-05-15 Thread Darlo Bobo
Hello,

I have built a small Flink app which receive events (json events), deserialize 
them to an object and then uses the Table API to create two tables, do some 
join and then write the results back to a kafka stream.

What is the suggested method to correctly test that the code written with the 
Table API works as expected? Any best practice for ITs ?
Is the best way to simply run the app through flink and then test it with some 
external tools such a small python script?

(Using Flink 1.8.3)

Thank you,
Darlo

Re: Protection against huge values in RocksDB List State

2020-05-15 Thread Yun Tang
Hi Robin

I think you could record the size of you list under currentKey with another 
value state or operator state (store a Map with , 
store the whole map in list when snapshotting). If you do not have many key-by 
keys, operator state is a good choice as that is on-heap and lightweight.

Best
Yun Tang

From: Robin Cassan 
Sent: Friday, May 15, 2020 20:59
To: Yun Tang 
Cc: user 
Subject: Re: Protection against huge values in RocksDB List State

Hi Yun, thanks for your answer! And sorry I didn't see this limitation from the 
documentation, makes sense!
In our case, we are merging too many elements (since each element is limited to 
4Mib in our kafka topic). I agree we do not want our state to contain really 
big values, this is why we are trying to find a way to put a limit on the 
number (or total size) of elements that are aggregated in the state of the 
window.
We have found a way to do this by using another sessionWindow that is set 
before the other one, which will store the number of messages for each key and 
reject new messages if we have reached a limit, but we are wondering if there 
is a better way to achieve that without creating another state.

Thanks again,
Robin

Le jeu. 14 mai 2020 à 19:38, Yun Tang 
mailto:myas...@live.com>> a écrit :
Hi Robin

First of all, the root cause is not RocksDB cannot store large list state when 
you merge but the JNI limitation of 2^31 bytes [1].
Moreover, RocksDB java would not return anything when you call merge [2] 
operator.

Did you merge too many elements or just merge too big-size elements? Last but 
not least, even you could merge large list, I think getting a value with size 
larger than 2^31 bytes should not behave well.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
[2] 
https://github.com/facebook/rocksdb/blob/50d63a2af01a46dd938dc1b717067339c92da040/java/src/main/java/org/rocksdb/RocksDB.java#L1382

Best
Yun Tang

From: Robin Cassan 
mailto:robin.cas...@contentsquare.com>>
Sent: Friday, May 15, 2020 0:37
To: user mailto:user@flink.apache.org>>
Subject: Protection against huge values in RocksDB List State

Hi all!

I cannot seem to find any setting to limit the number of records appended in a 
RocksDBListState that is used when we use SessionWindows with a ProcessFunction.
It seems that, for each incoming element, the new element will be appended to 
the value with the RocksDB `merge` operator, without any safeguard to make sure 
that it doesn't grow infinitely. RocksDB merge seems to support returning false 
in case of error, so I guess we could implement a limit by returning false in 
the merge operator, but since Flink seems to use the "stringappendtest" merge 
operator ( 
https://github.com/facebook/rocksdb/blob/fdf882ded218344c136c97daf76dfb59e4bc155f/utilities/merge_operators/string_append/stringappend2.cc
 ), we always return true no matter what.

This is troublesome for us because it would make a lot of sense to specify an 
acceptable limit to how many elements can be aggregated under a given key, and 
because when we happen to have too many elements we get an exception from 
RocksDB:
```
Caused by: org.apache.flink.util.FlinkRuntimeException: Error while retrieving 
data from RocksDB
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111)
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501)
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
... 7 more
Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM limit
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:810)
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118)
... 12 more
```

We are currently bypassing this by using a Reduce operator instead, which 
ensures that we only store one element per key, but this gives us degraded 
performance.

Thanks for your input!
Robin


Memory growth from TimeWindows

2020-05-15 Thread Slotterback, Chris
Hey Flink users,

I wanted to see if I could get some insight on what the heap memory profile of 
my stream app should look like vs my expectation. My layout consists of a 
sequence of FlatMaps + Maps, feeding a pair of 5 minute 
TumblingEventTimeWindows, intervalJoined, into a 24 hour (per 5 minute) 
SlidingEventTimeWindow, then intervalJoined again, back into the first set of 
FlatMaps. The data flow works as expected, and the reports I am generated in 
the last join appear to be correct, and contain info from the 24 hour sliding 
window.

My understanding is that while all these windows build their memory state, I 
can expect heap memory to grow for the 24 hour length of the 
SlidingEventTimeWindow, and then start to flatten as the t-24hr window frames 
expire and release back to the JVM. What is actually happening is when a 
constant data source feeds the stream, the heap memory profile grows linearly 
past the 24 hour mark. Could this be a result of a misunderstanding of how the 
window’s memory states are kept, or is my assumption correct, and it is more 
likely I have a leak somewhere?

Thanks as always
Chris




Re: upgrade flink from 1.9.1 to 1.10.0 on EMR

2020-05-15 Thread Robert Metzger
Flink 1.11 will support Hadoop 3. EMR 6 requires Hadoop 3, that's why Flink
was not included anymore. Amazon will add Flink back to EMR 6.0 soon.

On Thu, May 14, 2020 at 7:11 PM aj  wrote:

> Hi Yang,
>
> I am able to resolve the issue by removing Hadoop dependency as you
> mentioned.
>
> 1. Removed hadoop-common dependency and
>
> org.apache.flink
> flink-streaming-java_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-hadoop-fs
> 
> 
>
> 
>
>
> 2. After the above changes, I am able to submit job on yarn but facing
> issues with s3 plugin that I resolved by putting *
> flink-s3-fs-hadoop-1.9.0.jar *i n the plugins/s3-fs-hadoop  directory.
>
> Thanks for your support.
>
> Any update when will flink.10 officially supported in EMR.  Even in new
> EMR version(emr 6.0)  flink has been removed.
>
>
>
> On Sat, May 9, 2020 at 1:36 PM aj  wrote:
>
>> Hello Yang,
>>
>> I have attached my pom file and I did not see that I am using any Hadoop
>> dependency. Can you please help me.
>>
>> On Wed, May 6, 2020 at 1:22 PM Yang Wang  wrote:
>>
>>> Hi aj,
>>>
>>> From the logs you have provided, the hadoop version is still 2.4.1.
>>> Could you check the user jar(i.e. events-processor-1.0-SNAPSHOT.jar)
>>> have some
>>> hadoop classes? If it is, you need to exclude the hadoop dependency.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> aj  于2020年5月6日周三 下午3:38写道:
>>>
 Hello,

 Please help me upgrade to 1.10 in AWS EMR.

 On Fri, May 1, 2020 at 4:05 PM aj  wrote:

> Hi Yang,
>
> I am attaching the logs for your reference, please help me what i am
> doing wrong.
>
> Thanks,
> Anuj
>
> On Wed, Apr 29, 2020 at 9:06 AM Yang Wang 
> wrote:
>
>> Hi Anuj,
>>
>> I think the exception you come across still because the hadoop version
>> is 2.4.1. I have checked the hadoop code, the code line are exactly
>> same.
>> For 2.8.1, i also have checked the ruleParse. It could work.
>>
>> /**
>>  * A pattern for parsing a auth_to_local rule.
>>  */
>> private static final Pattern ruleParser =
>>   
>> Pattern.compile("\\s*((DEFAULT)|(RULE:\\[(\\d*):([^\\]]*)](\\(([^)]*)\\))?"+
>>   "(s/([^/]*)/([^/]*)/(g)?)?))/?(L)?");
>>
>>
>> Could you share the jobmanager logs so that i could check the
>> classpath and hadoop version?
>>
>> Best,
>> Yang
>>
>> aj  于2020年4月28日周二 上午1:01写道:
>>
>>> Hello Yang,
>>> My Hadoop version is Hadoop 3.2.1-amzn-0
>>> and I have put in flink/lib.
>>>  flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
>>>
>>> then I am getting below error :
>>>
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1587983834922_0002/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> Exception in thread "main" java.lang.IllegalArgumentException:
>>> Invalid rule: /L
>>>   RULE:[2:$1@$0](.*@)s/@.*///L
>>>   DEFAULT
>>> at
>>> org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321)
>>> at
>>> org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386)
>>> at
>>> org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
>>> at
>>> org.apache.flink.yarn.entrypoint.YarnEntrypointUtils.logYarnEnvironmentInformation(YarnEntrypointUtils.java:136)
>>> at
>>> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:109)
>>>
>>>
>>> if I remove the  flink-shaded-hadoop-2-uber-2.8.3-10.0.jar  from
>>> lib  then i get below error:
>>>
>>> 2020-04-27 16:59:37,293 INFO
>>>  org.apache.flink.client.cli.CliFrontend   -
>>>  Classpath:
>>> /usr/lib/flin

Re: Protection against huge values in RocksDB List State

2020-05-15 Thread Robin Cassan
Hi Yun, thanks for your answer! And sorry I didn't see this limitation from
the documentation, makes sense!
In our case, we are merging too many elements (since each element is
limited to 4Mib in our kafka topic). I agree we do not want our state to
contain really big values, this is why we are trying to find a way to put a
limit on the number (or total size) of elements that are aggregated in the
state of the window.
We have found a way to do this by using another sessionWindow that is set
before the other one, which will store the number of messages for each key
and reject new messages if we have reached a limit, but we are wondering if
there is a better way to achieve that without creating another state.

Thanks again,
Robin

Le jeu. 14 mai 2020 à 19:38, Yun Tang  a écrit :

> Hi Robin
>
> First of all, the root cause is not RocksDB cannot store large list state
> when you merge but the JNI limitation of 2^31 bytes [1].
> Moreover, RocksDB java would not return anything when you call merge [2]
> operator.
>
> Did you merge too many elements or just merge too big-size elements? Last
> but not least, even you could merge large list, I think getting a value
> with size larger than 2^31 bytes should not behave well.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
> [2]
> https://github.com/facebook/rocksdb/blob/50d63a2af01a46dd938dc1b717067339c92da040/java/src/main/java/org/rocksdb/RocksDB.java#L1382
>
> Best
> Yun Tang
> --
> *From:* Robin Cassan 
> *Sent:* Friday, May 15, 2020 0:37
> *To:* user 
> *Subject:* Protection against huge values in RocksDB List State
>
> Hi all!
>
> I cannot seem to find any setting to limit the number of records appended
> in a RocksDBListState that is used when we use SessionWindows with a
> ProcessFunction.
> It seems that, for each incoming element, the new element will be appended
> to the value with the RocksDB `merge` operator, without any safeguard to
> make sure that it doesn't grow infinitely. RocksDB merge seems to support
> returning false in case of error, so I guess we could implement a limit by
> returning false in the merge operator, but since Flink seems to use the
> "stringappendtest" merge operator (
> https://github.com/facebook/rocksdb/blob/fdf882ded218344c136c97daf76dfb59e4bc155f/utilities/merge_operators/string_append/stringappend2.cc
>  ),
> we always return true no matter what.
>
> This is troublesome for us because it would make a lot of sense to specify
> an acceptable limit to how many elements can be aggregated under a given
> key, and because when we happen to have too many elements we get an
> exception from RocksDB:
> ```
> Caused by: org.apache.flink.util.FlinkRuntimeException: Error while
> retrieving data from RocksDB
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111)
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
> ... 7 more
> Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM
> limit
> at org.rocksdb.RocksDB.get(Native Method)
> at org.rocksdb.RocksDB.get(RocksDB.java:810)
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118)
> ... 12 more
> ```
>
> We are currently bypassing this by using a Reduce operator instead, which
> ensures that we only store one element per key, but this gives us degraded
> performance.
>
> Thanks for your input!
> Robin
>


Developing Beam applications using Flink checkpoints

2020-05-15 Thread Ivan San Jose
Hi, we are starting to use Beam with Flink as runner on our
applications, and recently we would like to get advantages that Flink
checkpoiting provides, but it seems we are not understanding it
clearly.

Simplifying, our application does the following:
  - Read meesages from a couple of Kafka topics
  - Combine them
  - Write combination result to a sink (Exasol DB)

As application is processing messages using event time, and one of the
topics is almost idle, the first time application is started messages
are stuck in the combiner because watermark don't advance until we have
messages arriving onto idled topic (we know this and is not a problem
for us though).

The problem is that we've observed, if a checkpoint is triggered when
messages are still stuck in the combiner, surprisingly for us, the
checkpoint finishes successfully (and offsets committed to Kafka) even
messages haven't progressed to the sink yet. Is this expected?

The thing is that, if in the future, we make not state compatible
changes in application source code, checkpoint taken couldn't be
restored. So we would like to start the application without using any
checkpoint but without losing data.
Problem here would be that data loss would happen because messages
stuck in combiner are already committed to Kafka and application would
start to read from latest commited offset in Kafka if we don't use any
checkpoint, thus those messages are not going to be read from the
source again.

So, I guess our question is how are you doing in order to not lose data
when developing applications, because sooner or later you are going to
add breaking changes...

For example, we've seen those two errors so far:
  - After changing an operator name:

2020-05-13 07:23:52,248 ERROR Fatal error occurred in the cluster
entrypoint.
org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take
leadership with session id ----.
...
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
not set up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRu
nner.java:152)
at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.crea
teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerR
unner$5(Dispatcher.java:375)
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(Check
edSupplier.java:34)
... 7 more
Caused by: java.lang.IllegalStateException: Failed to rollback to
checkpoint/savepoint hdfs://RTDWLTDEV/data/lake/processing/flink-
savepoints/holly-reconciliation-fact/savepoint-90ab28-bcc1f65a0986.
Cannot map checkpoint/savepoint state for operator
f476451c6210bd2783f36fa331b9da5e to the new program, because the
operator is not available in the new program. If you want to allow to
skip this, you can set the --allowNonRestoredState option on the CLI.
at
org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoi
nt(Checkpoints.java:205)
...

  - After modifying a Java model class involved in a combine:
org.apache.flink.runtime.state.BackendBuildingException: Failed when
trying to restore heap backend
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(
HeapKeyedStateBackendBuilder.java:116)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedSta
teBackend(FsStateBackend.java:529)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attem
ptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.creat
eAndRestore(BackendRestorerProcedure.java:121)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initial
izeState(AbstractStreamOperator.java:253)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(Str
eamTask.java:881)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.j
ava:395)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.InvalidClassException:
internal.holly.beatrix.wallet.walletfact.model.WalletMetadata; local
class incompatible: stream classdesc serialVersionUID =
8366890161513008789, local class serialVersionUID = 174312384610985998
at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)


Apologies in advance as we are new to Flink, so may be w

Help with table-factory for SQL

2020-05-15 Thread Martin Frank Hansen
  Hi,

I am trying to connect to kafka through flink, but having some difficulty
getting the right table-factory-source.

I currently get the error: NoMatchingTableFactoryException: Could not find
a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in the classpath. my
sbt file looks like this:

name := "writeToSQL"

version := "0.1"

scalaVersion := "2.11.12"
val flinkVersion = "1.9.1"
val hadoopVersion = "3.0.0"

libraryDependencies ++= Seq(

  "org.slf4j" % "slf4j-api" % "1.7.15" % "runtime",
  "org.apache.flink" %% "flink-connector-kafka" % flinkVersion % "compile",
  "org.apache.flink" %% "flink-sql-connector-kafka" % flinkVersion % "compile",
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion
%  "provided",
  "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion %
"provided",
  "org.apache.flink" %% "flink-table-planner" % flinkVersion %  "provided",
  "org.apache.flink" % "flink-table-common" % flinkVersion %  "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" % "flink-table" % flinkVersion % "compile",
  "org.apache.flink" % "flink-json" % flinkVersion % "compile",
  "org.slf4j" % "slf4j-log4j12" % "1.7.25" % "runtime"
)

assemblyMergeStrategy in assembly := {
  case path if path.contains("META-INF/services") => MergeStrategy.concat
  case PathList("META-INF", _*) => MergeStrategy.discard
  case _ => MergeStrategy.first
}


>From the documentation
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#define-a-tablefactory
I
can see what is missing, but I do not know how to solve it.

The documentation says the following:
Define a TableFactory


A TableFactory allows to create different table-related instances from
string-based properties. All available factories are called for matching to
the given set of properties and a corresponding factory class.

Factories leverage Java’s Service Provider Interfaces (SPI)
 for
discovering. This means that every dependency and JAR file should contain a
file org.apache.flink.table.factories.TableFactory in the
META_INF/services resource
directory that lists all available table factories that it provides.

But how do I do that? I thought the sbt-file would take care of this.

Any help is highly appreciated!

Best regards

Martin Frank Hansen


Re: [ANNOUNCE] Apache Flink 1.10.1 released

2020-05-15 Thread Benchao Li
Thanks Yu for the great work, and everyone else who made this possible.

Dian Fu  于2020年5月15日周五 下午6:55写道:

> Thanks Yu for managing this release and everyone else who made this
> release possible. Good work!
>
> Regards,
> Dian
>
> 在 2020年5月15日,下午6:26,Till Rohrmann  写道:
>
> Thanks Yu for being our release manager and everyone else who made the
> release possible!
>
> Cheers,
> Till
>
> On Fri, May 15, 2020 at 9:15 AM Congxian Qiu 
> wrote:
>
>> Thanks a lot for the release and your great job, Yu!
>> Also thanks to everyone who made this release possible!
>>
>> Best,
>> Congxian
>>
>>
>> Yu Li  于2020年5月14日周四 上午1:59写道:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.10.1, which is the first bugfix release for the Apache Flink
>>> 1.10 series.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements for this bugfix release:
>>> https://flink.apache.org/news/2020/05/12/release-1.10.1.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346891
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Regards,
>>> Yu
>>>
>>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Flink Key based watermarks with event time

2020-05-15 Thread Congxian Qiu
Hi

Maybe you can try KeyedProcessFunction[1] for this, but you need to handle
the allow-latency logic[2] in your own business logic(event-time records
maybe out-of-order)

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#the-keyedprocessfunction
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/windows.html#allowed-lateness
Best,
Congxian


Gnanasoundari Soundarajan 
于2020年5月14日周四 下午12:10写道:

> Hi all,
>
>
>
> We have a requirement where we need to maintain key based watermarks with 
> *event
> time*. Each sensor will communicate with different timestamp where we
> need to maintain watermark separately for each sensor. Is this requirement
> can be achieved with Flink?
>
>
>
> Thanks.
>
>
>
> Regards,
>
> Gnana
>


Re: [ANNOUNCE] Apache Flink 1.10.1 released

2020-05-15 Thread Dian Fu
Thanks Yu for managing this release and everyone else who made this release 
possible. Good work!

Regards,
Dian

> 在 2020年5月15日,下午6:26,Till Rohrmann  写道:
> 
> Thanks Yu for being our release manager and everyone else who made the 
> release possible!
> 
> Cheers,
> Till
> 
> On Fri, May 15, 2020 at 9:15 AM Congxian Qiu  > wrote:
> Thanks a lot for the release and your great job, Yu!
> Also thanks to everyone who made this release possible!
> 
> Best,
> Congxian
> 
> 
> Yu Li mailto:car...@gmail.com>> 于2020年5月14日周四 上午1:59写道:
> The Apache Flink community is very happy to announce the release of Apache 
> Flink 1.10.1, which is the first bugfix release for the Apache Flink 1.10 
> series.
> 
> Apache Flink® is an open-source stream processing framework for distributed, 
> high-performing, always-available, and accurate data streaming applications.
> 
> The release is available for download at:
> https://flink.apache.org/downloads.html 
> 
> 
> Please check out the release blog post for an overview of the improvements 
> for this bugfix release:
> https://flink.apache.org/news/2020/05/12/release-1.10.1.html 
> 
> 
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346891
>  
> 
> 
> We would like to thank all contributors of the Apache Flink community who 
> made this release possible!
> 
> Regards,
> Yu



Testing process functions

2020-05-15 Thread Manas Kale
Hi,
How do I test process functions? I tried by implementing a sink function
that stores myProcessFunction's output in a list. After env.execute(), I
use assertions.
If I set a breakpoint in the myTestSink's invoke() method, I see that that
method is being called correctly. However, after env.execute() returns, all
data in sink functions is wiped clean.

TestSink myTestSink = new myTestSink();
testStream.process(new myProcessFunction()).addSink(myTestSink);
env.execute("test");
assertEquals(expectedOutput, myTestSink.actual);

What am I doing wrong?
 Also, I see that a ProcessFunctionTestHarnesses has been added in 1.10. I
wasn't able to download its sources to understand how I could use that.
Have the sources not been added to maven or is it a problem at my end?

Regards,
Manas


Re: [ANNOUNCE] Apache Flink 1.10.1 released

2020-05-15 Thread Till Rohrmann
Thanks Yu for being our release manager and everyone else who made the
release possible!

Cheers,
Till

On Fri, May 15, 2020 at 9:15 AM Congxian Qiu  wrote:

> Thanks a lot for the release and your great job, Yu!
> Also thanks to everyone who made this release possible!
>
> Best,
> Congxian
>
>
> Yu Li  于2020年5月14日周四 上午1:59写道:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.10.1, which is the first bugfix release for the Apache Flink
>> 1.10 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2020/05/12/release-1.10.1.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346891
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Yu
>>
>


Re: the savepoint problem of upgrading job from blink-1.5 to flink-1.10

2020-05-15 Thread Yun Tang
Hi Roc

Blink-1.5 should never make the promise that it could be compatible with 
Flink-1.10.
Moreover, the SavepointV3Serializer in Blink is totally no the same thing as 
Flink, and the reason why we introduce SavepointV3Serializer is because we use 
different state handle when we open source blink. As you can see, we use 
"org.apache.flink.runtime.state.KeyGroupsStateSnapshot" instead of 
"org.apache.flink.runtime.state.KeyGroupsStateHandle", and thus the savepoint 
generated by Blink cannot be easily consumed by Flink.

Best
Yun Tang


From: Congxian Qiu 
Sent: Friday, May 15, 2020 17:20
To: Roc Marshal 
Cc: user 
Subject: Re: the savepoint problem of upgrading job from blink-1.5 to flink-1.10

Hi,
Could you please share the stack or the log message?
If I understand correctly, savepoint V3 is not contained in 1.10,

Best,
Congxian


Roc Marshal mailto:flin...@126.com>> 于2020年5月15日周五 下午4:33写道:
Hi, all.

When using savepoint to upgrade a Flink job from blink-1.5 to flink-1.10,
the system prompts that blink savepointV3 is incompatible with the version in 
Flink. Is there any solution?

Thank you so much.








Sincerely,
Roc Marshal






Re: How To subscribe a Kinesis Stream using enhance fanout?

2020-05-15 Thread orionemail
Hi,

We also recently needed this functionality, unfortunately we were unable to 
implement it ourselves so changed our plan accordingly.

However we very much see the benefit for this feature and would be interested 
in following the JIRA ticket.

Thanks

‐‐‐ Original Message ‐‐‐
On Thursday, 14 May 2020 11:34, Xiaolong Wang  
wrote:

> Thanks, I'll check it out.
>
> On Thu, May 14, 2020 at 6:26 PM Tzu-Li (Gordon) Tai  
> wrote:
>
>> Hi Xiaolong,
>>
>> You are right, the way the Kinesis connector is implemented / the way the 
>> AWS APIs are used, does not allow it to consume Kinesis streams with 
>> enhanced fan-out enabled consumers [1].
>> Could you open a JIRA ticket for this?
>> As far as I can tell, this could be a valuable contribution to the connector 
>> for Kinesis users who require dedicated throughput isolated from other 
>> running consumers.
>>
>> Cheers,
>> Gordon
>>
>> [1] 
>> https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html
>>
>> On Wed, May 13, 2020 at 1:44 PM Xiaolong Wang  
>> wrote:
>>
>>> Hello Flink Community!
>>>
>>>   I'm currently coding on a project relying on AWS Kinesis. With the 
>>> provided connector (flink-connector-kinesis_2.11;1.10.0), I can consume the 
>>> message.
>>>
>>>  But as the main stream is used among several other teams, I was 
>>> required to use the enhance fanout of Kinesis. I checked the connector code 
>>> and found no implementations.
>>>
>>>  Has this issue occurred to anyone before?
>>>
>>> Thanks for your help.

Re: the savepoint problem of upgrading job from blink-1.5 to flink-1.10

2020-05-15 Thread Congxian Qiu
Hi,
Could you please share the stack or the log message?
If I understand correctly, savepoint V3 is not contained in 1.10,

Best,
Congxian


Roc Marshal  于2020年5月15日周五 下午4:33写道:

> Hi, all.
>
> When using savepoint to upgrade a Flink job from blink-1.5 to flink-1.10,
> the system prompts that blink savepointV3 is incompatible with the version
> in Flink. Is there any solution?
>
> Thank you so much.
>
>
>
>
>
>
>
>
> Sincerely,
> Roc Marshal
>
>
>
>


Re: Incremental state with purging

2020-05-15 Thread Congxian Qiu
Hi
>From your description,  you want to do two things:
1 update state and remote the state older than x
2 output the state every y second

>From my side, the first can be done by using TTL state as Yun said,
the second can be done by using KeyedProcessFunction[1]

If you want to have complex logic to remove the older state in step 1,
maybe you can also use the KeyedProcessFunction and timer()

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#the-keyedprocessfunction
Best,
Congxian


Yun Tang  于2020年5月13日周三 下午7:42写道:

> Hi
>
> From your description: "output the state every y seconds and remove old
> elements", I think TTL [1] is the proper solution for your scenario. And
> you could define the ttl of your state as y seconds so that processfunction
> could only print elements in the last y seconds.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>
> Best
> Yun Tang
> --
> *From:* Annemarie Burger 
> *Sent:* Wednesday, May 13, 2020 2:46
> *To:* user@flink.apache.org 
> *Subject:* Incremental state with purging
>
> Hi,
>
> I'm trying to implement the most efficient way to incrementally put
> incoming
> DataStream elements in my (map)state, while removing old elements (older
> that x) from that same state. I then want to output the state every y
> seconds. I've looked into using the ProcessFunction with onTimer, or
> building my own Trigger for a window function, but I struggle with putting
> all this together in a logical and efficient way. Since the state is very
> big I don't want to duplicate it over multiple (sliding)windows. Does
> anybody know the best way to achieve this? Some pseudo code would be very
> helpful.
>
> Thanks!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


the savepoint problem of upgrading job from blink-1.5 to flink-1.10

2020-05-15 Thread Roc Marshal
Hi, all.


When using savepoint to upgrade a Flink job from blink-1.5 to flink-1.10, 
the system prompts that blink savepointV3 is incompatible with the version in 
Flink. Is there any solution? 


Thank you so much.
















Sincerely,
Roc Marshal

Re: Flink suggestions;

2020-05-15 Thread Chesnay Schepler
Am I understanding you correctly in that, if one sensor of on factory 
raises an alert, then you want all sensors in that same factory to raise 
an alert?


How big is this dataset that maps sensors to factories?

Maybe you can just load them into a Map in say a 
FlatMap, enrich the sensor data stream, keyBy the factory id and then do 
your windowing logic (I assume you have some time window)?
Although this would only work if you want alerts only for sensors in a 
given window.


If you want to have an alert for literally all sensors in a factory then 
you could instead add the FlatMap after the window, and map the sensor 
-> factory -> all_sensors and duplicate the alerts.


On 15/05/2020 08:21, Aissa Elaffani wrote:

Hello Guys,
I am a beginner in this field of real-time streaming and i am working 
with apache flink, and i ignore a lot of features of it, and actually 
I am building an application, in which i receive some sensors data in 
this format {"status": "Alerte", "classe": " ", "value": 
{"temperature": 15.7}, "mode": "ON", "equipementID": 1, "date": 
"2019-03-20 22:00", "sensorID": 9157}, each sensor is installed on an 
equipment in a workshop in a factory somewhere. My goal is :
If one sensor of a factory get down (status="alerte"), I want that the 
status of all the factory to be Alerte. But as the Stream does not 
contain the factory ID, other Static data set source that contain the 
data of factories and the sensors that belongs to each one.
So Please guys i want to know the optimized way to do so, and the 
aggregation that i need to do!
Sorry for disturbing you, i wish you all the best! And i hope you 
share with me the of your experiences!

Best regards,
Aissa





Re: Flink performance tuning on operators

2020-05-15 Thread Chesnay Schepler

Generally there should be no difference.
Can you check whether the maps are running as a chain (as a single task)?
If they are running in a chain, then I would suspect that /something/ 
else is skewing your results.

If not, then the added network/serialization pressure would explain it.

I will assume that the mismatch in variable names in  your second 
example (JsonData vs rawDataAsJson) is just a typo.


On 15/05/2020 04:29, Ivan Yang wrote:

Hi,

We have a Flink job that reads data from an input stream, then converts each event 
from JSON string Avro object, finally writes to parquet files using StreamingFileSink 
with OnCheckPointRollingPolicy of 5 mins. Basically a stateless job. Initially, we 
use one map operator to convert Json string to Avro object, Inside the map function, 
it goes form String -> JsonObject -> Avro object.

DataStream avroData = data.map(new JsonToAVRO());

When we try to break the map operator to two, one for String to JsonObject, 
another for JsonObject to Avro.

 DataStream JsonData = data.map(new StringToJson());
 DataStream avroData = rawDataAsJson.map(new 
JsonToAvroSchema())

The benchmark shows significant performance hit when breaking down to two 
operators. We try to understand the Flink internal on why such a big 
difference. The setup is using state backend = filesystem. Checkpoint = s3 
bucket. Our event object has 300+ attributes.


Thanks
Ivan





Re: [ANNOUNCE] Apache Flink 1.10.1 released

2020-05-15 Thread Congxian Qiu
Thanks a lot for the release and your great job, Yu!
Also thanks to everyone who made this release possible!

Best,
Congxian


Yu Li  于2020年5月14日周四 上午1:59写道:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.10.1, which is the first bugfix release for the Apache Flink 1.10
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2020/05/12/release-1.10.1.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346891
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Yu
>