Re: Scalaj vs akka as http client for Asyncio Flink

2019-04-11 Thread Andy Hoang
Hi Till, 
Thanks for your reply, I manage do some experiments and has result as some 
worked and some not. I hope you can give me a bit more insight:

As your suggestion to impl a `RichAsyncFunction` with transient field, like 
this and having error

```
Class 'RichAsyncHttpClient' must either be declared abstract or implement 
abstract member 'executionContext: ExecutionContextExecutor' in 
‘com.parcelperform.util.RichAsyncHttpClient’
```

```
class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, 
Either[Throwable, ResponseEntity]]{

  PPLogger.getActivityLogger.info("###INIT --- ")
  @transient implicit var materializer: ActorMaterializer
  @transient implicit var system: ActorSystem
  @transient implicit var executionContext: ExecutionContextExecutor


  override def asyncInvoke(input: Shipment, resultFuture: 
async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

val resultFutureRequested: Future[HttpResponse] = 
Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json;))

resultFutureRequested.onComplete {
  case Success(res) => {
resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection)
  }
  case Failure(x)   => {
resultFuture.complete(Iterable(Either(x)).asJavaCollection)
  }
}
  }

  override def open(parameters: Configuration): Unit = {
super.open(parameters)
system = ActorSystem("my-system")
executionContext = system.dispatcher
materializer = ActorMaterializer()
  }
}
```

Aslo the Usage of that class, I has error, I guess its because of java/scala 
issue. In flink doc, for java code they use RichAsyncFunction and for scala 
they use AsyncFunction:
```
//AsyncDataStream.unorderedWait(streamShipment, new 
RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ## error Type 
mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual: 
RichAsyncHttpClient

```

### 

So I try to fix my current code again with transient field and move it into 
constructor:
```


class AsyncHttpClient( args: Array[String] = Array()) extends 
AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

  @transient implicit lazy val system = {
PPLogger.getActivityLogger.info("###INIT --- ")
ActorSystem("my-system")
  }
  @transient implicit lazy val executionContext = {
system.dispatcher
  }
  @transient implicit lazy val materializer: ActorMaterializer = {
PPLogger.getActivityLogger.info("###DONE --- ")
ActorMaterializer()
  }

  override def asyncInvoke(input: Shipment, resultFuture: 
ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
//PPLogger.getActivityLogger.info("###INIT --- ")
//implicit val system = ActorSystem("my-system")
//implicit val executionContext = system.dispatcher
//implicit val materializer: ActorMaterializer = ActorMaterializer()
//PPLogger.getActivityLogger.info("###DONE --- ")

val resultFutureRequested: Future[HttpResponse] = 
Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json;))
resultFutureRequested.onComplete {
  case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
  }
  case Failure(x)   => {
resultFuture.complete(Iterable(Left(x)))
  }
}

  }

  override def timeout(input: Shipment, resultFuture: 
ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function 
call has timed out."
  }
}
```
And its run ok. The log was print only one. 

I still asking about this because I haven’t understand the term `That way you 
only create the `ActorMaterialier` on the `TaskManager` where the operator is 
executed and solve the problem of serializability `. I though for all the code 
executed inside TaskManger?

Thanks for being patient with me, till here

Andy,




> On Apr 11, 2019, at 7:12 PM, Till Rohrmann  wrote:
> 
> Hi Andy,
> 
> without being an expert of Akka's http client, I think you should not create 
> a new ActorSystem for every call to `AsyncFunction#asyncInvoke`. What I would 
> recommend you instead is to implement a `RichAsyncFunction` with a transient 
> field for `ActorMaterializer` which you initialize in the 
> `RichAsyncFunction#open` method. That way you only create the 
> `ActorMaterialier` on the `TaskManager` where the operator is executed and 
> solve the problem of serializability and you make it much more efficient 
> because you don't create a new `ActorSystem` for every request.
> 
> Cheers,
> Till
> 
> On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang  > wrote:
> Hi guys,
> 
> I’m try to decide which http client to go with Flink, currently I tested with 
> scalaj and akka http client and both work ok with our current dev environment.
> For scalaj its is pretty straight forward since its is just calling an 

Re: Hbase Connector failed when deployed to yarn

2019-04-11 Thread hai
Hi, Tang:


Thaks for your reply, will this issue fix soon?I don’t think 
putflink-hadoop-compatibility jar under FLINK_HOME/libis a elegant solution.


Regards


Original Message
Sender:Yun tangmyas...@live.com
Recipient:hai...@magicsoho.com; useru...@flink.apache.org
Date:Friday, Apr 12, 2019 02:02
Subject:Re: Hbase Connector failed when deployed to yarn


Hi


I believe this is the same problem which reported in 
https://issues.apache.org/jira/browse/FLINK-12163 , current work around 
solution is to put flink-hadoop-compatibility jar under FLINK_HOME/lib.



Best
Yun Tang

From: hai h...@magicsoho.com
 Sent: Thursday, April 11, 2019 21:06
 To: user
 Subject: Re: Hbase Connector failed when deployed to yarn

And my pom.xml dependencies is :


dependencies
!-- Scala --
dependency
  groupIdorg.scala-lang/groupId
  artifactIdscala-library/artifactId
  version${scala.version}/version
/dependency
dependency
  groupIdorg.scala-lang/groupId
  artifactIdscala-compiler/artifactId
  version${scala.version}/version
/dependency


!-- SL4J  Log4j  Kafka-Appender  Flume-Appender --
dependency
  groupIdorg.slf4j/groupId
  artifactIdslf4j-api/artifactId
  version1.7.21/version
/dependency


!-- 1.1.1 --
dependency
  groupIdch.qos.logback/groupId
  artifactIdlogback-core/artifactId
  version1.1.1/version
/dependency
dependency
  groupIdch.qos.logback/groupId
  artifactIdlogback-classic/artifactId
  version1.1.1/version
/dependency
!-- Flink --
dependency
  groupIdorg.apache.flink/groupId
  artifactIdflink-scala_${scala.binary.version}/artifactId
  version${flink.version}/version
  scopecompile/scope
/dependency
dependency
  groupIdorg.apache.flink/groupId
  artifactIdflink-streaming-scala_${scala.binary.version}/artifactId
  version${flink.version}/version
  scopecompile/scope
/dependency
dependency
  groupIdorg.apache.flink/groupId
  artifactIdflink-runtime-web_${scala.binary.version}/artifactId
  version${flink.version}/version
/dependency


dependency
  groupIdorg.apache.flink/groupId
  artifactIdflink-hbase_${scala.binary.version}/artifactId
  version${flink.version}/version
/dependency
dependency
  groupIdorg.apache.flink/groupId
  artifactIdflink-hadoop-compatibility_${scala.binary.version}/artifactId
  version${flink.version}/version
/dependency
dependency
  groupIdorg.apache.hadoop/groupId
  artifactIdhadoop-mapreduce-client-core/artifactId
  version${hadoop.version}/version
/dependency
dependency
  groupIdcglib/groupId
  artifactIdcglib/artifactId
  version2.2.2/version
/dependency
!-- Hadoop --
dependency
  groupIdorg.apache.hadoop/groupId
  artifactIdhadoop-common/artifactId
  version${hadoop.version}/version
/dependency
  /dependencies


Original Message
Sender:hai...@magicsoho.com
Recipient:user@flink.apache.org
Date:Thursday, Apr 11, 2019 21:04
Subject:Hbase Connector failed when deployed to yarn


Hello:
  I am new to flink, and I copy the official Hbase connector examples from 
source
flink/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
and run in a yarn-cluster with the command:
 

bin/flink run -m yarn-cluster -yn 2 -c {class-path-prefix}.HBaseWriteExample 
{my-application}.jar
 
 What I have get is:



The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117)
Caused by: java.lang.RuntimeException: Could not load the TypeInformation for 
the class 'org.apache.hadoop.io.Writable'. You may be missing the 

Re: Question regarding "Insufficient number of network buffers"

2019-04-11 Thread zhijiang
Hi Allen,

There are two ways for setting network buffers. The old way via 
`taskmanager.network.numberOfBuffers` is deprecated. The new way is via three 
parameters min,max and fraction. 
The specific formula is Math.min(network.memory.max, 
Math.max(network.memory.min, network.memory.fraction * jvmMemory). 
If both ways are setting, only the new way works. You can adjust these three 
parameters accordingly. 
Also you could check the log of task manager by searching " MB for network 
buffer pool (number of memory segments: " to confirm whether your setting is 
working as expected.

Best,
Zhijiang
--
From:Xiangfeng Zhu 
Send Time:2019年4月12日(星期五) 08:03
To:user 
Subject:Question regarding "Insufficient number of network buffers"

Hello,

My name is Allen, and I'm currently researching different distributed execution 
engines. I wanted to run some benchmarks on Flink with a 10-node cluster(each 
node has 64vCPUs and 376GB memory). I ran the program with parallelism 320 and 
got an error message: 
"Caused by: java.io.IOException: Insufficient number of network buffers: 
required 320, but only 128 available. The total number of network buffers is 
currently set to 32768 of 32768 bytes each. You can increase this number by 
setting the configuration keys 'taskmanager.network.memory.fraction', 
'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'." 

Currently, I set the following parameters:
jobmanager.heap.size: 102400m
taskmanager.memory.size: 102400m
taskmanager.numberOfTaskSlots: 32
taskmanager.network.memory.min: 102400m
taskmanager.network.memory.max: 102400m
taskmanager.network.memory.fraction: 0.5
(For the last three fields, I've also tried to set 
taskmanager.network.numberOfBuffers: 40960 directly)
Could you please give me some advice about how should I fix it?
Thank you so much! 

Best,
Allen



Re: Version "Unknown" - Flink 1.7.0

2019-04-11 Thread Vishal Santoshi
Hello ZILI,
  I run flink from the distribution as from
https://flink.apache.org/downloads.html#apache-flink-180.
In my case that my flink pipe is  run a job cluster on k8s.

Regards.


On Sat, Feb 2, 2019 at 12:24 PM ZILI CHEN  wrote:

> The version is generated in EnvironmentInformation#getVersion. As the
> comment stands,
> the version can be null(and rendered as "") if the JobManager
> does not run from a Maven build.
>
> Specifically Flink getVersion by "version =
> EnvironmentInformation.class.getPackage().getImplementationVersion();"
>
> Is it your situation?
>
> Best,
> tison.
>
>
> Vishal Santoshi  于2019年2月2日周六 下午10:27写道:
>
>> +1 ( though testing in JOB mode  on k8s )
>>
>> On Fri, Feb 1, 2019 at 6:45 PM anaray  wrote:
>>
>>> Though not a major issue. I see that Flink UI and REST api gives flink
>>> version as "UNKNOWN"
>>> I am using flink 1.7.0, with and running the cluster in JOB mode.
>>>
>>> REST endpoint /overview output
>>>
>>> {"taskmanagers":1,"slots-total":4,"slots-available":3,"jobs-running":1,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0,"*flink-version":"*","flink-commit":"49da9f9"}
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: K8s job cluster and cancel and resume from a save point ?

2019-04-11 Thread Vishal Santoshi
I confirm that 1.8.0 fixes all the above issue . The JM process exits with
code 0 and exits the pod ( TERMINATED state ) . The above is true for both
PATCH cancel and POST save point with cancel as above.

Thank you for fixing this issue.


On Wed, Mar 13, 2019 at 10:17 AM Vishal Santoshi 
wrote:

> BTW, does 1.8 also solve the issue where we can cancel with a save point.
> That too is broken in 1.7.2
>
> curl  --header "Content-Type: application/json" --request POST --data 
> '{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":*true*}' 
>https://*/jobs//savepoints
>
>
> On Tue, Mar 12, 2019 at 11:55 AM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Awesome, thanks!
>>
>> On Tue, Mar 12, 2019 at 11:53 AM Gary Yao  wrote:
>>
>>> The RC artifacts are only deployed to the Maven Central Repository when
>>> the RC
>>> is promoted to a release. As written in the 1.8.0 RC1 voting email [1],
>>> you
>>> can find the maven artifacts, and the Flink binaries here:
>>>
>>> -
>>> https://repository.apache.org/content/repositories/orgapacheflink-1210/
>>> - https://dist.apache.org/repos/dist/dev/flink/flink-1.8.0-rc1/
>>>
>>> Alternatively, you can apply the patch yourself, and build Flink 1.7 from
>>> sources [2]. On my machine this takes around 10 minutes if tests are
>>> skipped.
>>>
>>> Best,
>>> Gary
>>>
>>> [1]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html#build-flink
>>>
>>> On Tue, Mar 12, 2019 at 4:01 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Do you have a mvn repository ( at mvn central )  set up for 1,8 release
 candidate. We could test it for you.

 Without 1.8and this exit code we are essentially held up.

 On Tue, Mar 12, 2019 at 10:56 AM Gary Yao  wrote:

> Nobody can tell with 100% certainty. We want to give the RC some
> exposure
> first, and there is also a release process that is prescribed by the
> ASF [1].
> You can look at past releases to get a feeling for how long the release
> process lasts [2].
>
> [1] http://www.apache.org/legal/release-policy.html#release-approval
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/template/NamlServlet.jtp?macro=search_page=1=%5BVOTE%5D+Release=0
>
>
> On Tue, Mar 12, 2019 at 3:38 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> And when is the 1.8.0 release expected ?
>>
>> On Tue, Mar 12, 2019 at 10:32 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> :) That makes so much more sense. Is  k8s native flink a part of
>>> this release ?
>>>
>>> On Tue, Mar 12, 2019 at 10:27 AM Gary Yao 
>>> wrote:
>>>
 Hi Vishal,

 This issue was fixed recently [1], and the patch will be released
 with 1.8. If
 the Flink job gets cancelled, the JVM should exit with code 0.
 There is a
 release candidate [2], which you can test.

 Best,
 Gary

 [1] https://issues.apache.org/jira/browse/FLINK-10743
 [2]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html

 On Tue, Mar 12, 2019 at 3:21 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Thanks Vijay,
>
> This is the larger issue.  The cancellation routine is itself
> broken.
>
> On cancellation flink does remove the checkpoint counter
>
> *2019-03-12 14:12:13,143
> INFO  
> org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
> Removing /checkpoint-counter/ from
> ZooKeeper *
>
> but exist with a non zero code
>
> *2019-03-12 14:12:13,477
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> Terminating cluster entrypoint process StandaloneJobClusterEntryPoint 
> with
> exit code 1444.*
>
>
> That I think is an issue. A cancelled job is a complete job and
> thus the exit code should be 0 for k8s to mark it complete.
>
>
>
>
>
>
>
>
>
> On Tue, Mar 12, 2019 at 10:18 AM Vijay Bhaskar <
> bhaskar.eba...@gmail.com> wrote:
>
>> Yes Vishal. Thats correct.
>>
>> Regards
>> Bhaskar
>>
>> On Tue, Mar 12, 2019 at 7:14 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> This really not cool but here you go. This seems to work. Agreed
>>> that 

Question regarding "Insufficient number of network buffers"

2019-04-11 Thread Xiangfeng Zhu
Hello,

My name is Allen, and I'm currently researching different distributed
execution engines. I wanted to run some benchmarks on Flink with a 10-node
cluster(each node has 64vCPUs and 376GB memory). I ran the program with
parallelism 320 and got an error message:
"Caused by: java.io.IOException: Insufficient number of network buffers:
required 320, but only 128 available. The total number of network buffers
is currently set to 32768 of 32768 bytes each. You can increase this number
by setting the configuration keys 'taskmanager.network.memory.fraction',
'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'."

Currently, I set the following parameters:
jobmanager.heap.size: 102400m
taskmanager.memory.size: 102400m
taskmanager.numberOfTaskSlots: 32
taskmanager.network.memory.min: 102400m
taskmanager.network.memory.max: 102400m
taskmanager.network.memory.fraction: 0.5
(For the last three fields, I've also tried to
set taskmanager.network.numberOfBuffers: 40960 directly)

Could you please give me some advice about how should I fix it?
Thank you so much!

Best,
Allen


Re: Hbase Connector failed when deployed to yarn

2019-04-11 Thread Yun Tang
Hi

I believe this is the same problem which reported in 
https://issues.apache.org/jira/browse/FLINK-12163 , current work around 
solution is to put flink-hadoop-compatibility jar under FLINK_HOME/lib.

Best
Yun Tang

From: hai 
Sent: Thursday, April 11, 2019 21:06
To: user
Subject: Re: Hbase Connector failed when deployed to yarn


And my pom.xml dependencies is :








org.scala-lang

scala-library

${scala.version}





org.scala-lang

scala-compiler

${scala.version}








org.slf4j

slf4j-api

1.7.21








ch.qos.logback

logback-core

1.1.1





ch.qos.logback

logback-classic

1.1.1







org.apache.flink

flink-scala_${scala.binary.version}

${flink.version}

compile





org.apache.flink


flink-streaming-scala_${scala.binary.version}

${flink.version}

compile





org.apache.flink

flink-runtime-web_${scala.binary.version}

${flink.version}






org.apache.flink

flink-hbase_${scala.binary.version}

${flink.version}





org.apache.flink


flink-hadoop-compatibility_${scala.binary.version}

${flink.version}





org.apache.hadoop

hadoop-mapreduce-client-core

${hadoop.version}





cglib

cglib

2.2.2







org.apache.hadoop

hadoop-common

${hadoop.version}





 Original Message
Sender: hai
Recipient: user@flink.apache.org
Date: Thursday, Apr 11, 2019 21:04
Subject: Hbase Connector failed when deployed to yarn


Hello:

I am new to flink, and I copy the official Hbase connector examples from 
source

flink/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
and run in a yarn-cluster with the command:

bin/flink run -m yarn-cluster -yn 2 -c {class-path-prefix}.HBaseWriteExample 
{my-application}.jar

What I have get is:


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117)
Caused by: java.lang.RuntimeException: Could not load the TypeInformation for 
the class 

Re: Flink 1.7.2 UI : Jobs removed from Completed Jobs section

2019-04-11 Thread Jins George
Thank you Guowei. That was the trick!

By default jobs from completed section are expired and removed in 1 hour.  I 
have increased jobstore.expiration-time and now completed jobs are retained.

Thanks,
Jins

From: Guowei Ma 
Date: Wednesday, April 10, 2019 at 3:29 AM
To: Jins George 
Cc: Timothy Victor , user 
Subject: Re: Flink 1.7.2 UI : Jobs removed from Completed Jobs section

I am not very sure about this problem. But you could try to increase 
jobstore.expiration-time in config.
Best,
Guowei


Jins George mailto:jins.geo...@aeris.net>> 于2019年4月10日周三 
下午1:01写道:
Any input on this UI behavior ?

Thanks,
Jins

From: Timothy Victor mailto:vict...@gmail.com>>
Date: Monday, April 8, 2019 at 10:47 AM
To: Jins George mailto:jins.geo...@aeris.net>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Flink 1.7.2 UI : Jobs removed from Completed Jobs section

I face the same issue in Flink 1.7.1.

Would be good to know a solution.

Tim

On Mon, Apr 8, 2019, 12:45 PM Jins George 
mailto:jins.geo...@aeris.net>> wrote:
Hi,

I am facing a weird problem in which jobs from ‘Completed Jobs’ section in 
Flink 1.7.2 UI disappear.  Looking at the job manager logs, I see the job was 
failed and restarted  ‘restart-strategy.fixed-delay.attempts’ times and the 
JobMaster was stopped.
I was able to see the job in Completed Jobs section with the status as FAILED, 
but after some time,  I don’t see it any more.  The jobmanager was never 
restarted, so I expected the Failed or completed jobs to appear in Completed 
Jobs section.

Any idea what might be happening ?


JobManager.log:

2019-04-06 18:21:10,638 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not 
restart the job dwellalert-ubuntu-0403174608-698009a0 
(b274377e6a223078d6f40b9c0620ee0d) because the restart strategy prevented it.
2019-04-06 18:21:10,662 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
- Stopping the JobMaster for job 
dwellalert-ubuntu-0403174608-698009a0(b274377e6a223078d6f40b9c0620ee0d).

Restart Strategy Conf:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
restart-strategy.fixed-delay.delay: 10 s


Thanks
Jins George


Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

2019-04-11 Thread Vijay Balakrishnan
Thx for all your replies. Solved the problem by skirting the issue. I
pre-populated the incoming Monitoring Object on intake with the dynamic
runtime fields keyName and keyValue and that way, I could use the static
call as used in all the other if conditions:
  monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId",
"eventName", "component", "keyName","keyValue");

The reason, I want to use Tuple was because I was passing this
KeyedStream to a common method that could handle the Tuple accordingly.

I tried using  [ keyBy(KeySelector, TypeInformation) ] but the compiler
complained that I need to use Monitoring, Tuple6 in that particular case.

Vijay

On Sun, Apr 7, 2019 at 5:40 PM abhishek sharma 
wrote:

> I agree with Timothy, POJO would be a much better approach.
>
> However, If you are trying to build some generic framework and for
> different streams, there would be different fields, you can follow the Map
> approach. For the latter approach, you need to write extra mapper class
> which will convert all the fields in the stream to the Map based stream.
>
> Abhishek
>
> On Sun, Apr 7, 2019 at 3:07 AM Timothy Victor  wrote:
>
>> Could this just be solved by creating a POJO model class for your problem?
>>
>> That is, instead of using Tuple6 - create a class that encapsulates your
>> data.   This, I think, would solve your problem.  But beyond that I think
>> the code will be more understandable.  It's hard to have a Tuple6 of all
>> Strings, and remember what each one means -- even if I wrote the code :-)
>> Furthermore, if and when you need to add more elements to your data model,
>> you will need to refactor your entire Flink graph.   Keeping a data model
>> in POJO protects against those things.
>>
>> The latter is just unsolicited code review feedback.   And I know I gave
>> it without much context to your problem.  So please take with a large grain
>> of salt, and if it doesn't apply just ignore it.
>>
>> Tim
>>
>>
>> On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler 
>> wrote:
>>
>>> > I tried using  [ keyBy(KeySelector, TypeInformation) ]
>>>
>>> What was the result of this approach?
>>>
>>> On 03/04/2019 17:36, Vijay Balakrishnan wrote:
>>>
>>> Hi Tim,
>>> Thanks for your reply. I am not seeing an option to specify a
>>> .returns(new TypeHint>> String,String,String,String,String>>(){}) with KeyedStream ??
>>>
 monitoringTupleKeyedStream = kinesisStream.keyBy(new
 KeySelector() {
 public Tuple getKey(Monitoring mon) throws Exception 
 {..return
 new Tuple6<>(..}})
>>>
>>> I tried using
>>> TypeInformation>
>>> info = TypeInformation.of(new TypeHint>> String, String, String>>(){});
>>>
 kinesisStream.keyBy(new KeySelector() {...}, info);
 //specify typeInfo through

>>>
>>> TIA,
>>> Vijay
>>>
>>> On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor  wrote:
>>>
 Flink needs type information for serializing and deserializing objects,
 and that is lost due to Java type erasure.   The only way to workaround
 this is to specify the return type of the function called in the lambda.

 Fabian's answer here explains it well.


 https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554

 Tim

 On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan 
 wrote:

> Hi,
> I am trying to use the KeyedStream with Tuple to handle diffrent types
> of Tuples including Tuple6.
> Keep getting the Exception:
> *Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: Usage of 
> class
> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
> Tuple2, etc.) instead*.
> Is there a way around Type Erasure here ?
> I want to use KeyedStream so that I can pass it on
> to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.
>
> Code below:
>
> KeyedStream monitoringTupleKeyedStream = null;
>> String keyOperationType = ;//provided
>> if (StringUtils.isNotEmpty(keyOperationType)) {
>> if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION))
>> {
>> monitoringTupleKeyedStream =
>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component");
>> } else if
>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
>> monitoringTupleKeyedStream =
>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component",
>> "instance");
>> } else if
>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
>> TypeInformation> String, String>> info = TypeInformation.of(new TypeHint> String, String, String, String, String>>(){});
>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>> KeySelector() {
>> public Tuple getKey(Monitoring mon) throws Exception 

RE: Re: Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

2019-04-11 Thread min.tan
Many thanks for your replies.

After I increased MinPauseBetweenCheckpoints and moved to a memory backend for 
checkpoint. It has disappeared.

Thank you both again for your help.


Regards,

Min
From: Piotr Nowojski [mailto:pi...@ververica.com]
Sent: Donnerstag, 11. April 2019 15:01
To: Fabian Hueske
Cc: Tan, Min; user
Subject: [External] Re: Default Kafka producers pool size for 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE

Hi Min and Fabian,

The pool size is independent of the parallelism, task slots count or task 
managers count. The only thing that you should consider is how many 
simultaneous checkpoints you might have in your setup.

As Fabian wrote, with
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

The default value of the pool size of 5 should be more than enough.

Could you double check if something is not overriding those configuration 
values? If not could you provide the JobManager and TaskManager logs?

Piotrek


On 11 Apr 2019, at 09:32, Fabian Hueske 
mailto:fhue...@gmail.com>> wrote:

Hi Min,

I think the pool size is per parallel sink task, i.e., it should be independent 
of the parallelism of the sink operator.
>From my understanding a pool size of 5 should be fine if the maximum number of 
>concurrent checkpoints is 1.
Running out of connections would mean that there are 5 in-flight checkpoints 
that were not completed, which seems a lot to me (given that the sink is 
probably at the end of the program).

If I remember correctly, Piotr (in CC) was working on the exactly-once feature 
of the Kafka producer.
Maybe he can help.

Best,
Fabian

Am Mo., 8. Apr. 2019 um 14:43 Uhr schrieb 
mailto:min@ubs.com>>:
Hi,

I keep getting exceptions 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many 
ongoing snapshots. Increase kafka producers pool size or decrease number of 
concurrent checkpoints."

I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase 
this size. What considerations should I take to increase this size? what is a 
size for a normal setting e.g. 32?

I have a check point setting like this and run a parallelism of 16 and have a 
check point setting like this

public static void setup(StreamExecutionEnvironment env) {
env.enableCheckpointing(2_000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);
env.getCheckpointConfig().setCheckpointTimeout(60_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64));

//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}

Regards,

Min


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Flink Forward Europe 2019 - Call for Presentations open until 17th May

2019-04-11 Thread Fabian Hueske
Hi all,

Flink Forward Europe returns to Berlin on October 7-9th, 2019.
We are happy to announce that the Call for Presentations is open!

Please submit a proposal if you'd like to present your Apache Flink
experience, best practices, new features, or use cases in front of an
international audience of highly skilled and enthusiastic Flink users and
committers.

Flink Forward will run tracks for the following topics:
* Use Case
* Operations
* Technology Deep Dive
* Ecosystem
* Research

For the first time, we'll also have a Community track.

Please find the submission form at
https://berlin-2019.flink-forward.org/call-for-presentations

The deadline for submissions is May 17th, 11:59pm (CEST).

Best regards,
Fabian
(PC Chair for Flink Forward Berlin 2019)


Re: Hbase Connector failed when deployed to yarn

2019-04-11 Thread hai
And my pom.xml dependencies is :


dependencies
!-- Scala --
dependency
  groupIdorg.scala-lang/groupId
  artifactIdscala-library/artifactId
  version${scala.version}/version
/dependency
dependency
  groupIdorg.scala-lang/groupId
  artifactIdscala-compiler/artifactId
  version${scala.version}/version
/dependency


!-- SL4J  Log4j  Kafka-Appender  Flume-Appender --
dependency
  groupIdorg.slf4j/groupId
  artifactIdslf4j-api/artifactId
  version1.7.21/version
/dependency


!-- 1.1.1 --
dependency
  groupIdch.qos.logback/groupId
  artifactIdlogback-core/artifactId
  version1.1.1/version
/dependency
dependency
  groupIdch.qos.logback/groupId
  artifactIdlogback-classic/artifactId
  version1.1.1/version
/dependency
!-- Flink --
dependency
  groupIdorg.apache.flink/groupId
  artifactIdflink-scala_${scala.binary.version}/artifactId
  version${flink.version}/version
  scopecompile/scope
/dependency
dependency
  groupIdorg.apache.flink/groupId
  artifactIdflink-streaming-scala_${scala.binary.version}/artifactId
  version${flink.version}/version
  scopecompile/scope
/dependency
dependency
  groupIdorg.apache.flink/groupId
  artifactIdflink-runtime-web_${scala.binary.version}/artifactId
  version${flink.version}/version
/dependency


dependency
  groupIdorg.apache.flink/groupId
  artifactIdflink-hbase_${scala.binary.version}/artifactId
  version${flink.version}/version
/dependency
dependency
  groupIdorg.apache.flink/groupId
  artifactIdflink-hadoop-compatibility_${scala.binary.version}/artifactId
  version${flink.version}/version
/dependency
dependency
  groupIdorg.apache.hadoop/groupId
  artifactIdhadoop-mapreduce-client-core/artifactId
  version${hadoop.version}/version
/dependency
dependency
  groupIdcglib/groupId
  artifactIdcglib/artifactId
  version2.2.2/version
/dependency
!-- Hadoop --
dependency
  groupIdorg.apache.hadoop/groupId
  artifactIdhadoop-common/artifactId
  version${hadoop.version}/version
/dependency
  /dependencies


Original Message
Sender:hai...@magicsoho.com
Recipient:user@flink.apache.org
Date:Thursday, Apr 11, 2019 21:04
Subject:Hbase Connector failed when deployed to yarn


Hello:
  I am new to flink, and I copy the official Hbase connector examples from 
source
flink/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
and run in a yarn-cluster with the command:


bin/flink run -m yarn-cluster -yn 2 -c {class-path-prefix}.HBaseWriteExample 
{my-application}.jar

What I have get is:



The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117)
Caused by: java.lang.RuntimeException: Could not load the TypeInformation for 
the class 'org.apache.hadoop.io.Writable'. You may be missing the 
'flink-hadoop-compatibility' dependency.
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2025)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1649)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1591)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:778)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:998)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:679)
at 

Hbase Connector failed when deployed to yarn

2019-04-11 Thread hai
Hello:
  I am new to flink, and I copy the official Hbase connector examples from 
source
flink/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
and run in a yarn-cluster with the command:


bin/flink run -m yarn-cluster -yn 2 -c {class-path-prefix}.HBaseWriteExample 
{my-application}.jar

What I have get is:



The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117)
Caused by: java.lang.RuntimeException: Could not load the TypeInformation for 
the class 'org.apache.hadoop.io.Writable'. You may be missing the 
'flink-hadoop-compatibility' dependency.
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2025)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1649)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1591)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:778)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:998)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:679)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:791)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:621)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:425)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
at com.luckyfish.flink.java.HBaseWriteExample.main(HBaseWriteExample.java:75)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
... 13 more


What should I do to deal with this exception ?


Many Thanks

Re: Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

2019-04-11 Thread Piotr Nowojski
Hi Min and Fabian,

The pool size is independent of the parallelism, task slots count or task 
managers count. The only thing that you should consider is how many 
simultaneous checkpoints you might have in your setup.

As Fabian wrote, with
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

The default value of the pool size of 5 should be more than enough. 

Could you double check if something is not overriding those configuration 
values? If not could you provide the JobManager and TaskManager logs?

Piotrek

> On 11 Apr 2019, at 09:32, Fabian Hueske  wrote:
> 
> Hi Min,
> 
> I think the pool size is per parallel sink task, i.e., it should be 
> independent of the parallelism of the sink operator.
> From my understanding a pool size of 5 should be fine if the maximum number 
> of concurrent checkpoints is 1.
> Running out of connections would mean that there are 5 in-flight checkpoints 
> that were not completed, which seems a lot to me (given that the sink is 
> probably at the end of the program).
> 
> If I remember correctly, Piotr (in CC) was working on the exactly-once 
> feature of the Kafka producer.
> Maybe he can help.
> 
> Best,
> Fabian
> 
> Am Mo., 8. Apr. 2019 um 14:43 Uhr schrieb  >:
> Hi,
> 
>  
> 
> I keep getting exceptions 
> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many 
> ongoing snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints."
> 
>  
> 
> I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase 
> this size. What considerations should I take to increase this size? what is a 
> size for a normal setting e.g. 32?
> 
>  
> 
> I have a check point setting like this and run a parallelism of 16 and have a 
> check point setting like this
> 
>  
> 
> public static void setup(StreamExecutionEnvironment env) {
> 
> env.enableCheckpointing(2_000);
> 
> 
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> 
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);
> 
> env.getCheckpointConfig().setCheckpointTimeout(60_000);
> 
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> 
> env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64));
> 
> 
> //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> 
> }
> 
>  
> 
> Regards,
> 
>  
> 
> Min
> 



Re: Query on job restoration using relocated savepoint

2019-04-11 Thread Stefan Richter
Hi,

the first case sounds like you made a mistake when editing the paths manually 
and deleted one ore more bytes that were not part of the path and thus 
corrupted the meta data. For the second approach, of course you also need to 
replace the paths after reading and before rewriting the metadata. This 
approach is basically the programmatic version of your first attempt, but using 
Flink’s code to avoid the pitfalls of corrupting the file.

Best,
Stefan 

> On 10. Apr 2019, at 19:16, Parth Sarathy  wrote:
> 
> Hi All,
>   We are trying to restore a job using relocated savepoint
> files. As pointed out in the FAQs of savepoint documentation, savepoints
> have absolute paths recorded in them and hence a simple relocation to
> restore the job would fail. As directed in the documentation we tried out
> the simple way to refactor the paths by editing them manually, but the job
> submission failed with an IllegalStateException as noted below :
> Caused by: java.lang.IllegalStateException: Reading invalid
> OperatorStateHandle, type: 50
>at
> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.deserializeOperatorStateHandle(SavepointV2Serializer.java:499)
> 
> We then went ahead and gave a swing at the second prescribed option of
> utilizing the SavepointV2Serializer for deserializing and serializing the
> metadata file. Even with this approach we observed that the generated
> metadata file still referenced the old absolute path. We are in a stuck in a
> predicament as of now. How is it that we can set / change the absolute paths
> present in the metadata file using the prescribed SavepointV2Serializer.
> It’d be helpful if you could provide some insight into this.
> 
> Thanks,
> Parth Sarathy
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Query on job restoration using relocated savepoint

2019-04-11 Thread Stefan Richter
Small correction, on the first case: more likely is that you changed the path 
string but I think those are prefixed by the string length, so that would 
require manual adjustment as well to not corrupt the metadata.

> On 11. Apr 2019, at 14:42, Stefan Richter  wrote:
> 
> Hi,
> 
> the first case sounds like you made a mistake when editing the paths manually 
> and deleted one ore more bytes that were not part of the path and thus 
> corrupted the meta data. For the second approach, of course you also need to 
> replace the paths after reading and before rewriting the metadata. This 
> approach is basically the programmatic version of your first attempt, but 
> using Flink’s code to avoid the pitfalls of corrupting the file.
> 
> Best,
> Stefan 
> 
>> On 10. Apr 2019, at 19:16, Parth Sarathy  
>> wrote:
>> 
>> Hi All,
>>  We are trying to restore a job using relocated savepoint
>> files. As pointed out in the FAQs of savepoint documentation, savepoints
>> have absolute paths recorded in them and hence a simple relocation to
>> restore the job would fail. As directed in the documentation we tried out
>> the simple way to refactor the paths by editing them manually, but the job
>> submission failed with an IllegalStateException as noted below :
>> Caused by: java.lang.IllegalStateException: Reading invalid
>> OperatorStateHandle, type: 50
>>   at
>> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.deserializeOperatorStateHandle(SavepointV2Serializer.java:499)
>> 
>> We then went ahead and gave a swing at the second prescribed option of
>> utilizing the SavepointV2Serializer for deserializing and serializing the
>> metadata file. Even with this approach we observed that the generated
>> metadata file still referenced the old absolute path. We are in a stuck in a
>> predicament as of now. How is it that we can set / change the absolute paths
>> present in the metadata file using the prescribed SavepointV2Serializer.
>> It’d be helpful if you could provide some insight into this.
>> 
>> Thanks,
>> Parth Sarathy
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 



Re: What is the best way to handle data skew processing in Data Stream applications?

2019-04-11 Thread Felipe Gutierrez
thanks All for your suggestions!

I am not sure if the option 3 that Fabian said I will need to change the
Flink source code or it can be implemented on top of Flink.
-
3) One approach to improve the processing of skewed data, is to change how
keyed state is handled.
Flink's keyed state is partitioned in two steps:
1. each key is assigned to a key group based on an internal hash function.
2. each key group is assigned to and processed by a parallel operator task.
For full control over data placement, you need to control both.
Changing 1) is tricky because it affects savepoint compatibility.
Changing 2) does not help if two hot keys are assigned to the same keyed
state.
-
I did an experiment

with a Mapper function

that maps to a key with one more parameter (a skew parameter). The results
are better.

Integer skewParameter = 0;
if (stationId.equals(new Integer(2)) && platformId.equals(new Integer(3)))
{ // this is the skewed key
skewParameter = this.skewParameterGenerator.getNextItem();
}
CompositeSkewedKeyStationPlatform compositeKey = new
CompositeSkewedKeyStationPlatform(stationId, platformId, skewParameter);

But it is still a static solution =(. I mean, the developer has to set on
the Mapper which key is skewed.

Best,
Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Thu, Apr 11, 2019 at 1:49 PM Till Rohrmann  wrote:

> Just a small addition:
>
> If two hot keys fall into two key groups which are being processed by the
> same TM, then it could help to change the parallelism, because then the key
> group mapping might be different.
>
> If two hot keys fall into the same key group, you can adjust the max
> parallelism which defines how many key groups will be used. By changing the
> number, it might happen that the two hot keys fall into different key
> groups.
>
> Cheers,
> Till
>
> On Thu, Apr 11, 2019 at 9:22 AM Fabian Hueske  wrote:
>
>> Hi Felipe,
>>
>> three comments:
>>
>> 1) applying rebalance(), shuffle(), or rescale() before a keyBy() has no
>> effect:
>> keyBy() introduces a hash partitioning such that any data partitioning
>> that you do immediately before keyBy() is destroyed.
>> You only change the distribution for the call of the key extractor which
>> should be a lightweight function anyway.
>> That's why you do not see any difference between the three methods.
>>
>> 2) windowAll() defines a non-keyed window over the whole stream.
>> All records are processed by the same non-parallel instance of the window
>> operator.
>> That's why assigning a higher parallelism to that operator does not help.
>>
>> 3) One approach to improve the processing of skewed data, is to change
>> how keyed state is handled.
>> Flink's keyed state is partitioned in two steps:
>> 1. each key is assigned to a key group based on an internal hash
>> function.
>> 2. each key group is assigned to and processed by a parallel operator
>> task.
>> For full control over data placement, you need to control both.
>> Changing 1) is tricky because it affects savepoint compatibility.
>> Changing 2) does not help if two hot keys are assigned to the same keyed
>> state.
>>
>> Best, Fabian
>>
>> Am Mi., 10. Apr. 2019 um 11:50 Uhr schrieb Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com>:
>>
>>> Hi,
>>>
>>> I am studying data skew processing in Flink and how I can change the
>>> low-level control of physical partition (
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning)
>>> in order to have an even processing of tuples. I have created synthetic
>>> skewed data sources and I aim to process (aggregate) them over a window.
>>> Here is the complete code:
>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeyDAG.java#L61
>>>
>>> streamTrainsStation01.union(streamTrainsStation02)
>>> .union(streamTicketsStation01).union(streamTicketsStation02)
>>> // map the keys
>>> .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
>>> .rebalance() // or .rescale() .shuffle()
>>> .keyBy(new StationPlatformKeySelector())
>>> .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
>>> .apply(new
>>> StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
>>> .setParallelism(4)
>>> .map(new
>>> StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
>>> .addSink(new MqttStationPlatformPublisher(ipAddressSink,
>>> topic)).name(metricSinkFunction)
>>> ;
>>>
>>> According to the Flink dashboard I 

Re: Scalaj vs akka as http client for Asyncio Flink

2019-04-11 Thread Till Rohrmann
Hi Andy,

without being an expert of Akka's http client, I think you should not
create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`.
What I would recommend you instead is to implement a `RichAsyncFunction`
with a transient field for `ActorMaterializer` which you initialize in the
`RichAsyncFunction#open` method. That way you only create the
`ActorMaterialier` on the `TaskManager` where the operator is executed and
solve the problem of serializability and you make it much more efficient
because you don't create a new `ActorSystem` for every request.

Cheers,
Till

On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang  wrote:

> Hi guys,
>
> I’m try to decide which http client to go with Flink, currently I tested
> with scalaj and akka http client and both work ok with our current dev
> environment.
> For scalaj its is pretty straight forward since its is just calling an
> http request with its timeout.
>
> For akka http client its a bit more complicated (I’m new to scala and
> all), so I’m asking if am I doing it right by create a AsyncFunction like
> this
> ```
>
> class AsyncHttpClient( args: Array[String] = Array()) extends 
> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
>
>   override def asyncInvoke(input: Shipment, resultFuture: 
> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>
> PPLogger.getActivityLogger.info("###INIT --- ")
> implicit val system = ActorSystem("my-system")
> implicit val executionContext = system.dispatcher
> implicit val materializer: ActorMaterializer = ActorMaterializer()
> val resultFutureRequested: Future[HttpResponse] = 
> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json;))
> PPLogger.getActivityLogger.info("###DONE --- ")
>
>
> resultFutureRequested.onComplete {
>   case Success(res) => {
> resultFuture.complete(Iterable(Right(res.entity)))
>   }
>   case Failure(x)   => {
> resultFuture.complete(Iterable(Left(x)))
>   }
> }
>
>   }
>
>   override def timeout(input: Shipment, resultFuture: 
> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
> resultFuture.complete(Iterable(Left(new TimeoutException("Async function 
> call has timed out."
>   }
> }
>
> ```
> I notice that I have to implicit create a bunch of variable inside the
> asyncInvoke method. I’m not sure if I’m doing it right, or just adding the
> overhead. I did try to init them in the constructor of the class but the
> compiler just throw a bunch of Not implemented Serializer error.
>
> My lib:
>"com.typesafe.akka" %% "akka-http" % "10.1.8",
>   "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,
>
> My flink:
> scala 2.12
> flink 1.70
>
>
>
> Any reply are appreciated!
>
> Thanks a bunch
>
> Andy,
>
>
>
>


Re: [ANNOUNCE] Apache Flink 1.8.0 released

2019-04-11 Thread Richard Deurwaarder
Very nice! Thanks Aljoscha and all contributors!

I have one question, will the docker image for 1.8.0 be released soon as
well? https://hub.docker.com/_/flink has the versions up to 1.7.2.

Regards,

Richard

On Wed, Apr 10, 2019 at 4:54 PM Rong Rong  wrote:

> Congrats! Thanks Aljoscha for being the release manager and all for making
> the release possible.
>
> --
> Rong
>
>
> On Wed, Apr 10, 2019 at 4:23 AM Stefan Richter 
> wrote:
>
>> Congrats and thanks to Aljoscha for managing the release!
>>
>> Best,
>> Stefan
>>
>> > On 10. Apr 2019, at 13:01, Biao Liu  wrote:
>> >
>> > Great news! Thanks Aljoscha and all the contributors.
>> >
>> > Till Rohrmann mailto:trohrm...@apache.org>>
>> 于2019年4月10日周三 下午6:11写道:
>> > Thanks a lot to Aljoscha for being our release manager and to the
>> community making this release possible!
>> >
>> > Cheers,
>> > Till
>> >
>> > On Wed, Apr 10, 2019 at 12:09 PM Hequn Cheng > > wrote:
>> > Thanks a lot for the great release Aljoscha!
>> > Also thanks for the work by the whole community. :-)
>> >
>> > Best, Hequn
>> >
>> > On Wed, Apr 10, 2019 at 6:03 PM Fabian Hueske > > wrote:
>> > Congrats to everyone!
>> >
>> > Thanks Aljoscha and all contributors.
>> >
>> > Cheers, Fabian
>> >
>> > Am Mi., 10. Apr. 2019 um 11:54 Uhr schrieb Congxian Qiu <
>> qcx978132...@gmail.com >:
>> > Cool!
>> >
>> > Thanks Aljoscha a lot for being our release manager, and all the others
>> who make this release possible.
>> >
>> > Best, Congxian
>> > On Apr 10, 2019, 17:47 +0800, Jark Wu > imj...@gmail.com>>, wrote:
>> > > Cheers!
>> > >
>> > > Thanks Aljoscha and all others who make 1.8.0 possible.
>> > >
>> > > On Wed, 10 Apr 2019 at 17:33, vino yang > > wrote:
>> > >
>> > > > Great news!
>> > > >
>> > > > Thanks Aljoscha for being the release manager and thanks to all the
>> > > > contributors!
>> > > >
>> > > > Best,
>> > > > Vino
>> > > >
>> > > > Driesprong, Fokko  于2019年4月10日周三 下午4:54写道:
>> > > >
>> > > > > Great news! Great effort by the community to make this happen.
>> Thanks all!
>> > > > >
>> > > > > Cheers, Fokko
>> > > > >
>> > > > > Op wo 10 apr. 2019 om 10:50 schreef Shaoxuan Wang <
>> wshaox...@gmail.com >:
>> > > > >
>> > > > > > Thanks Aljoscha and all others who made contributions to FLINK
>> 1.8.0.
>> > > > > > Looking forward to FLINK 1.9.0.
>> > > > > >
>> > > > > > Regards,
>> > > > > > Shaoxuan
>> > > > > >
>> > > > > > On Wed, Apr 10, 2019 at 4:31 PM Aljoscha Krettek <
>> aljos...@apache.org >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > The Apache Flink community is very happy to announce the
>> release of
>> > > > > > Apache
>> > > > > > > Flink 1.8.0, which is the next major release.
>> > > > > > >
>> > > > > > > Apache Flink® is an open-source stream processing framework
>> for
>> > > > > > > distributed, high-performing, always-available, and accurate
>> data
>> > > > > > streaming
>> > > > > > > applications.
>> > > > > > >
>> > > > > > > The release is available for download at:
>> > > > > > > https://flink.apache.org/downloads.html <
>> https://flink.apache.org/downloads.html>
>> > > > > > >
>> > > > > > > Please check out the release blog post for an overview of the
>> > > > > > improvements
>> > > > > > > for this bugfix release:
>> > > > > > > https://flink.apache.org/news/2019/04/09/release-1.8.0.html <
>> https://flink.apache.org/news/2019/04/09/release-1.8.0.html>
>> > > > > > >
>> > > > > > > The full release notes are available in Jira:
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344274
>> <
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344274
>> >
>> > > > > > >
>> > > > > > > We would like to thank all contributors of the Apache Flink
>> community
>> > > > > who
>> > > > > > > made this release possible!
>> > > > > > >
>> > > > > > > Regards,
>> > > > > > > Aljoscha
>> > > > > >
>> > > > >
>> > > >
>>
>>


Re: What is the best way to handle data skew processing in Data Stream applications?

2019-04-11 Thread Till Rohrmann
Just a small addition:

If two hot keys fall into two key groups which are being processed by the
same TM, then it could help to change the parallelism, because then the key
group mapping might be different.

If two hot keys fall into the same key group, you can adjust the max
parallelism which defines how many key groups will be used. By changing the
number, it might happen that the two hot keys fall into different key
groups.

Cheers,
Till

On Thu, Apr 11, 2019 at 9:22 AM Fabian Hueske  wrote:

> Hi Felipe,
>
> three comments:
>
> 1) applying rebalance(), shuffle(), or rescale() before a keyBy() has no
> effect:
> keyBy() introduces a hash partitioning such that any data partitioning
> that you do immediately before keyBy() is destroyed.
> You only change the distribution for the call of the key extractor which
> should be a lightweight function anyway.
> That's why you do not see any difference between the three methods.
>
> 2) windowAll() defines a non-keyed window over the whole stream.
> All records are processed by the same non-parallel instance of the window
> operator.
> That's why assigning a higher parallelism to that operator does not help.
>
> 3) One approach to improve the processing of skewed data, is to change how
> keyed state is handled.
> Flink's keyed state is partitioned in two steps:
> 1. each key is assigned to a key group based on an internal hash function.
> 2. each key group is assigned to and processed by a parallel operator task.
> For full control over data placement, you need to control both.
> Changing 1) is tricky because it affects savepoint compatibility.
> Changing 2) does not help if two hot keys are assigned to the same keyed
> state.
>
> Best, Fabian
>
> Am Mi., 10. Apr. 2019 um 11:50 Uhr schrieb Felipe Gutierrez <
> felipe.o.gutier...@gmail.com>:
>
>> Hi,
>>
>> I am studying data skew processing in Flink and how I can change the
>> low-level control of physical partition (
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning)
>> in order to have an even processing of tuples. I have created synthetic
>> skewed data sources and I aim to process (aggregate) them over a window.
>> Here is the complete code:
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeyDAG.java#L61
>>
>> streamTrainsStation01.union(streamTrainsStation02)
>> .union(streamTicketsStation01).union(streamTicketsStation02)
>> // map the keys
>> .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
>> .rebalance() // or .rescale() .shuffle()
>> .keyBy(new StationPlatformKeySelector())
>> .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
>> .apply(new
>> StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
>> .setParallelism(4)
>> .map(new
>> StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
>> .addSink(new MqttStationPlatformPublisher(ipAddressSink,
>> topic)).name(metricSinkFunction)
>> ;
>>
>> According to the Flink dashboard I could not see too much difference
>> among .shuffle(), .rescale(), and .rebalance(). Even though the
>> documentation says rebalance() transformation is more suitable for data
>> skew.
>>
>> After that I tried to use .partitionCustom(partitioner, "someKey").
>> However, for my surprise, I could not use setParallelism(4) on the window
>> operation. The documentation says "Note: This operation is inherently
>> non-parallel since all elements have to pass through the same operator
>> instance.". I did not understand why. If I am allowed to do partitionCustom
>> why can't I use parallelism after that?
>> Here is the complete code:
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedRescaleByKeyDAG.java#L74
>>
>> streamTrainsStation01.union(streamTrainsStation02)
>> .union(streamTicketsStation01).union(streamTicketsStation02)
>> // map the keys
>> .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
>> .partitionCustom(new StationPlatformKeyCustomPartitioner(), new
>> StationPlatformKeySelector())
>> .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
>> .apply(new
>> StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
>> .map(new
>> StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
>> .addSink(new MqttStationPlatformPublisher(ipAddressSink,
>> topic)).name(metricSinkFunction)
>> ;
>>
>> Thanks,
>> Felipe
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>


Re: Query on job restoration using relocated savepoint

2019-04-11 Thread Till Rohrmann
Hi Parth,

I've pulled Stefan into the conversation who might be able to help you with
your problem.

Cheers,
Till

On Wed, Apr 10, 2019 at 7:17 PM Parth Sarathy 
wrote:

> Hi All,
>We are trying to restore a job using relocated savepoint
> files. As pointed out in the FAQs of savepoint documentation, savepoints
> have absolute paths recorded in them and hence a simple relocation to
> restore the job would fail. As directed in the documentation we tried out
> the simple way to refactor the paths by editing them manually, but the job
> submission failed with an IllegalStateException as noted below :
> Caused by: java.lang.IllegalStateException: Reading invalid
> OperatorStateHandle, type: 50
> at
>
> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.deserializeOperatorStateHandle(SavepointV2Serializer.java:499)
>
> We then went ahead and gave a swing at the second prescribed option of
> utilizing the SavepointV2Serializer for deserializing and serializing the
> metadata file. Even with this approach we observed that the generated
> metadata file still referenced the old absolute path. We are in a stuck in
> a
> predicament as of now. How is it that we can set / change the absolute
> paths
> present in the metadata file using the prescribed SavepointV2Serializer.
> It’d be helpful if you could provide some insight into this.
>
> Thanks,
> Parth Sarathy
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Scalaj vs akka as http client for Asyncio Flink

2019-04-11 Thread Andy Hoang
Hi guys,

I’m try to decide which http client to go with Flink, currently I tested with 
scalaj and akka http client and both work ok with our current dev environment.
For scalaj its is pretty straight forward since its is just calling an http 
request with its timeout.

For akka http client its a bit more complicated (I’m new to scala and all), so 
I’m asking if am I doing it right by create a AsyncFunction like this
```
class AsyncHttpClient( args: Array[String] = Array()) extends 
AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

  override def asyncInvoke(input: Shipment, resultFuture: 
ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

PPLogger.getActivityLogger.info("###INIT --- ")
implicit val system = ActorSystem("my-system")
implicit val executionContext = system.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
val resultFutureRequested: Future[HttpResponse] = 
Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json;))
PPLogger.getActivityLogger.info("###DONE --- ")


resultFutureRequested.onComplete {
  case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
  }
  case Failure(x)   => {
resultFuture.complete(Iterable(Left(x)))
  }
}

  }

  override def timeout(input: Shipment, resultFuture: 
ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function 
call has timed out."
  }
}
```
I notice that I have to implicit create a bunch of variable inside the 
asyncInvoke method. I’m not sure if I’m doing it right, or just adding the 
overhead. I did try to init them in the constructor of the class but the 
compiler just throw a bunch of Not implemented Serializer error.

My lib:
   "com.typesafe.akka" %% "akka-http" % "10.1.8",
  "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,

My flink:
scala 2.12
flink 1.70



Any reply are appreciated!

Thanks a bunch

Andy,





Re: FlinkCEP and SQL?

2019-04-11 Thread Dawid Wysakowicz
The documentation should say which parts are supported and which are
not. I would say the majority of important features works.

Best,

Dawid

On 11/04/2019 12:40, Esa Heikkinen (TAU) wrote:
>
> Hi
>
>  
>
> Thank you. I need to read them. Does this all work in Flink 1.8 now ?
>
>  
>
> BR Esa
>
>  
>
> *From:*Dawid Wysakowicz 
> *Sent:* Thursday, April 11, 2019 12:59 PM
> *To:* Esa Heikkinen (TAU) ; Fabian Hueske
> 
> *Cc:* Dian Fu ; jincheng sun
> ; user@flink.apache.org
> *Subject:* Re: FlinkCEP and SQL?
>
>  
>
> Hi Esa,
>
> Have you checked out the flink documentation for this topic[1]? Is
> there something you are missing from there? Also the MATCH_RECOGNIZE
> is described in the SQL:2016 standard[2].
>
> For the Flink CEP library, yes it is inspired by the paper you mentioned.
>
> Best,
>
> Dawid
>
>  
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/match_recognize.html
>
> [2]
> https://standards.iso.org/ittf/PubliclyAvailableStandards/c065143_ISO_IEC_TR_19075-5_2016.zip
>
> On 11/04/2019 10:52, Esa Heikkinen (TAU) wrote:
>
> Hi
>
>  
>
> Is there any good scientific papers about using SQL
> MATCH_RECOGNIZE in Flink and CEP ?
>
>  
>
> The only one good paper about Flink and CEP that I have received
> was “Efficient Pattern Matching over Event Streams” (SIGMOD 2008).
> Its SASE+ Event Pattern Language (EPL) is best I have met. I have
> been told that FlinkCEP is based on this.
>
>  
>
> BR Esa
>
>  
>
> *From:*Fabian Hueske  
> *Sent:* Thursday, April 11, 2019 11:33 AM
> *To:* Esa Heikkinen (TAU) 
> 
> *Cc:* Dian Fu 
> ; jincheng sun
>  ;
> user@flink.apache.org 
> *Subject:* Re: FlinkCEP and SQL?
>
>  
>
> Hi Esa,
>
>  
>
> Flink's implementation of SQL MATCH_RECOGNIZE is based on it's CEP
> library, i.e., they share the same implementation.
>
>  
>
> Best, Fabian
>
>  
>
> Am Do., 11. Apr. 2019 um 10:29 Uhr schrieb Esa Heikkinen (TAU)
> mailto:esa.heikki...@tuni.fi>>:
>
> Hi
>
>  
>
> Is SQL CEP based (old) FlinkCEP at all and are SQL CEP and
> FlinkCEP completely separate ?
>
>  
>
> BR Esa
>
>  
>
> *From:*Dian Fu  >
> *Sent:* Thursday, April 4, 2019 2:37 PM
> *To:* Esa Heikkinen (TAU)  >
> *Cc:* jincheng sun  >; user@flink.apache.org
> 
> *Subject:* Re: FlinkCEP and SQL?
>
>  
>
> Should the all sources be combined into one big table
> before operations with SQL CEP?
>
>  
>
> Yes, you should combine them into one table/stream.
>
>  
>
> Regards,
>
> Dian
>
>  
>
> 在 2019年4月4日,下午7:11,Esa Heikkinen (TAU)
> mailto:esa.heikki...@tuni.fi>> 写道:
>
>  
>
> Hi
>
>  
>
> Thank you for the information. How this SQL CEP is
> applicable for situation where there are many sources with
> different type of events ? Should the all sources be
> combined into one big table before operations with SQL CEP?
>
>  
>
> BR Esa
>
>  
>
> *From:* jincheng sun  > 
> *Sent:* Thursday, April 4, 2019 1:05 PM
> *To:* Esa Heikkinen (TAU)  >
> *Cc:* user@flink.apache.org 
> *Subject:* Re: FlinkCEP and SQL?
>
>  
>
> Hi BR Esa,
>
> CEP is available in Flink SQL, Please the detail
> here: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sql.html#pattern-recognition
>
> Best,
>
> Jincheng
>
>  
>
> Esa Heikkinen (TAU)  > 于2019年4月4日周四 下午4:44写道:
>
> Hi
>
>  
>
> What is the situation of FlinkCEP and SQL?
>
>  
>
> Is it already possible to use SQL in CEP?
>
>  
>
> Is there any example cases where SQL is used in CEP?
>
>  
>
> BR Esa
>
>  
>


signature.asc
Description: OpenPGP digital signature


RE: FlinkCEP and SQL?

2019-04-11 Thread Esa Heikkinen (TAU)
Hi

Thank you. I need to read them. Does this all work in Flink 1.8 now ?

BR Esa

From: Dawid Wysakowicz 
Sent: Thursday, April 11, 2019 12:59 PM
To: Esa Heikkinen (TAU) ; Fabian Hueske 

Cc: Dian Fu ; jincheng sun ; 
user@flink.apache.org
Subject: Re: FlinkCEP and SQL?


Hi Esa,

Have you checked out the flink documentation for this topic[1]? Is there 
something you are missing from there? Also the MATCH_RECOGNIZE is described in 
the SQL:2016 standard[2].

For the Flink CEP library, yes it is inspired by the paper you mentioned.

Best,

Dawid



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/match_recognize.html

[2] 
https://standards.iso.org/ittf/PubliclyAvailableStandards/c065143_ISO_IEC_TR_19075-5_2016.zip
On 11/04/2019 10:52, Esa Heikkinen (TAU) wrote:
Hi

Is there any good scientific papers about using SQL MATCH_RECOGNIZE in Flink 
and CEP ?

The only one good paper about Flink and CEP that I have received was “Efficient 
Pattern Matching over Event Streams” (SIGMOD 2008). Its SASE+ Event Pattern 
Language (EPL) is best I have met. I have been told that FlinkCEP is based on 
this.

BR Esa

From: Fabian Hueske 
Sent: Thursday, April 11, 2019 11:33 AM
To: Esa Heikkinen (TAU) 
Cc: Dian Fu ; jincheng sun 
; 
user@flink.apache.org
Subject: Re: FlinkCEP and SQL?

Hi Esa,

Flink's implementation of SQL MATCH_RECOGNIZE is based on it's CEP library, 
i.e., they share the same implementation.

Best, Fabian

Am Do., 11. Apr. 2019 um 10:29 Uhr schrieb Esa Heikkinen (TAU) 
mailto:esa.heikki...@tuni.fi>>:
Hi

Is SQL CEP based (old) FlinkCEP at all and are SQL CEP and FlinkCEP completely 
separate ?

BR Esa

From: Dian Fu mailto:dian0511...@gmail.com>>
Sent: Thursday, April 4, 2019 2:37 PM
To: Esa Heikkinen (TAU) mailto:esa.heikki...@tuni.fi>>
Cc: jincheng sun mailto:sunjincheng...@gmail.com>>; 
user@flink.apache.org
Subject: Re: FlinkCEP and SQL?

Should the all sources be combined into one big table before operations with 
SQL CEP?

Yes, you should combine them into one table/stream.

Regards,
Dian

在 2019年4月4日,下午7:11,Esa Heikkinen (TAU) 
mailto:esa.heikki...@tuni.fi>> 写道:

Hi

Thank you for the information. How this SQL CEP is applicable for situation 
where there are many sources with different type of events ? Should the all 
sources be combined into one big table before operations with SQL CEP?

BR Esa

From: jincheng sun mailto:sunjincheng...@gmail.com>>
Sent: Thursday, April 4, 2019 1:05 PM
To: Esa Heikkinen (TAU) mailto:esa.heikki...@tuni.fi>>
Cc: user@flink.apache.org
Subject: Re: FlinkCEP and SQL?

Hi BR Esa,
CEP is available in Flink SQL, Please the detail here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sql.html#pattern-recognition
Best,
Jincheng

Esa Heikkinen (TAU) mailto:esa.heikki...@tuni.fi>> 
于2019年4月4日周四 下午4:44写道:

Hi



What is the situation of FlinkCEP and SQL?



Is it already possible to use SQL in CEP?



Is there any example cases where SQL is used in CEP?



BR Esa



Re: FlinkCEP and SQL?

2019-04-11 Thread Dawid Wysakowicz
Hi Esa,

Have you checked out the flink documentation for this topic[1]? Is there
something you are missing from there? Also the MATCH_RECOGNIZE is
described in the SQL:2016 standard[2].

For the Flink CEP library, yes it is inspired by the paper you mentioned.

Best,

Dawid


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/match_recognize.html

[2]
https://standards.iso.org/ittf/PubliclyAvailableStandards/c065143_ISO_IEC_TR_19075-5_2016.zip

On 11/04/2019 10:52, Esa Heikkinen (TAU) wrote:
>
> Hi
>
>  
>
> Is there any good scientific papers about using SQL MATCH_RECOGNIZE in
> Flink and CEP ?
>
>  
>
> The only one good paper about Flink and CEP that I have received was
> “Efficient Pattern Matching over Event Streams” (SIGMOD 2008). Its
> SASE+ Event Pattern Language (EPL) is best I have met. I have been
> told that FlinkCEP is based on this.
>
>  
>
> BR Esa
>
>  
>
> *From:*Fabian Hueske 
> *Sent:* Thursday, April 11, 2019 11:33 AM
> *To:* Esa Heikkinen (TAU) 
> *Cc:* Dian Fu ; jincheng sun
> ; user@flink.apache.org
> *Subject:* Re: FlinkCEP and SQL?
>
>  
>
> Hi Esa,
>
>  
>
> Flink's implementation of SQL MATCH_RECOGNIZE is based on it's CEP
> library, i.e., they share the same implementation.
>
>  
>
> Best, Fabian
>
>  
>
> Am Do., 11. Apr. 2019 um 10:29 Uhr schrieb Esa Heikkinen (TAU)
> mailto:esa.heikki...@tuni.fi>>:
>
> Hi
>
>  
>
> Is SQL CEP based (old) FlinkCEP at all and are SQL CEP and
> FlinkCEP completely separate ?
>
>  
>
> BR Esa
>
>  
>
> *From:*Dian Fu mailto:dian0511...@gmail.com>>
> *Sent:* Thursday, April 4, 2019 2:37 PM
> *To:* Esa Heikkinen (TAU)  >
> *Cc:* jincheng sun  >; user@flink.apache.org
> 
> *Subject:* Re: FlinkCEP and SQL?
>
>  
>
> Should the all sources be combined into one big table before
> operations with SQL CEP?
>
>  
>
> Yes, you should combine them into one table/stream.
>
>  
>
> Regards,
>
> Dian
>
>  
>
> 在 2019年4月4日,下午7:11,Esa Heikkinen (TAU)
> mailto:esa.heikki...@tuni.fi>> 写道:
>
>  
>
> Hi
>
>  
>
> Thank you for the information. How this SQL CEP is applicable
> for situation where there are many sources with different type
> of events ? Should the all sources be combined into one big
> table before operations with SQL CEP?
>
>  
>
> BR Esa
>
>  
>
> *From:* jincheng sun  > 
> *Sent:* Thursday, April 4, 2019 1:05 PM
> *To:* Esa Heikkinen (TAU)  >
> *Cc:* user@flink.apache.org 
> *Subject:* Re: FlinkCEP and SQL?
>
>  
>
> Hi BR Esa,
>
> CEP is available in Flink SQL, Please the detail
> here: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sql.html#pattern-recognition
>
> Best,
>
> Jincheng
>
>  
>
> Esa Heikkinen (TAU)  > 于2019年4月4日周四 下午4:44写道:
>
> Hi
>
>  
>
> What is the situation of FlinkCEP and SQL?
>
>  
>
> Is it already possible to use SQL in CEP?
>
>  
>
> Is there any example cases where SQL is used in CEP?
>
>  
>
> BR Esa
>
>  
>


signature.asc
Description: OpenPGP digital signature


Re: Apache Flink - How to destroy global window and release it's resources

2019-04-11 Thread Aljoscha Krettek
For a GlobalWindow, returning TriggerResult.PURGE (or FIRE_AND_PURGE) and 
removing all trigger state (meaning timers and state) will get rid of all 
state. There is no state for the GlobalWindow itself, it only semantically 
exists because of the elements in it and the Trigger state.

Aljoscha

> On 11. Apr 2019, at 10:15, Fabian Hueske  wrote:
> 
> Hi,
> 
> As far as I know, a window is only completely removed when time (event or 
> processing time, depending on the window type) passes the window's end 
> timestamp.
> Since, GlobalWindow's end timestamp is Long.MAX_VALUE, it is never completely 
> removed.
> I'm not 100% sure what state is kept around. It might not be keyed state but 
> just objects on the heap but not absolutely sure.
> 
> Aljoscha (in CC) should know the details here.
> 
> Best, Fabian
> 
> Am Do., 11. Apr. 2019 um 08:07 Uhr schrieb Guowei Ma  >:
> Hi,
> I think you could return a proper TriggerResult, which defines how to deal 
> with the window elements after computing a window in your trigger 
> implementation. You could find the detail information from the doc[1].
> 
> 1. 
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#fire-and-purge
>  
> 
> Best,
> Guowei
> 
> 
> M Singh mailto:mans2si...@yahoo.com>> 于2019年4月11日周四 
> 上午1:42写道:
> Hi:
> 
> I have a use case where I need to create a global window where I need to wait 
> for unknown time for certain events for a particular key.  I understand that 
> I can create a global window and use a custom trigger to initiate the 
> function computation.  But I am not sure how to destroy the window after the 
> triggering conditions is satisfied and the the events are purged.
> 
> If there is any better way of dealing with this situation, please let me know.
> 
> Thanks
> 
> Mans



Join of DataStream and DataSet

2019-04-11 Thread Reminia Scarlet
Spark streaming supports direct join from stream DataFrame and batch
DataFrame , and it's
easy to implement an enrich pipeline that joins a stream and a dimension
table.

 I checked the doc of flink, seems that this feature is a jira ticket which
haven't been resolved yet.

So how can I implement such a pipeline easily in Flink?


RE: FlinkCEP and SQL?

2019-04-11 Thread Esa Heikkinen (TAU)
Hi

Is there any good scientific papers about using SQL MATCH_RECOGNIZE in Flink 
and CEP ?

The only one good paper about Flink and CEP that I have received was “Efficient 
Pattern Matching over Event Streams” (SIGMOD 2008). Its SASE+ Event Pattern 
Language (EPL) is best I have met. I have been told that FlinkCEP is based on 
this.

BR Esa

From: Fabian Hueske 
Sent: Thursday, April 11, 2019 11:33 AM
To: Esa Heikkinen (TAU) 
Cc: Dian Fu ; jincheng sun ; 
user@flink.apache.org
Subject: Re: FlinkCEP and SQL?

Hi Esa,

Flink's implementation of SQL MATCH_RECOGNIZE is based on it's CEP library, 
i.e., they share the same implementation.

Best, Fabian

Am Do., 11. Apr. 2019 um 10:29 Uhr schrieb Esa Heikkinen (TAU) 
mailto:esa.heikki...@tuni.fi>>:
Hi

Is SQL CEP based (old) FlinkCEP at all and are SQL CEP and FlinkCEP completely 
separate ?

BR Esa

From: Dian Fu mailto:dian0511...@gmail.com>>
Sent: Thursday, April 4, 2019 2:37 PM
To: Esa Heikkinen (TAU) mailto:esa.heikki...@tuni.fi>>
Cc: jincheng sun mailto:sunjincheng...@gmail.com>>; 
user@flink.apache.org
Subject: Re: FlinkCEP and SQL?

Should the all sources be combined into one big table before operations with 
SQL CEP?

Yes, you should combine them into one table/stream.

Regards,
Dian

在 2019年4月4日,下午7:11,Esa Heikkinen (TAU) 
mailto:esa.heikki...@tuni.fi>> 写道:

Hi

Thank you for the information. How this SQL CEP is applicable for situation 
where there are many sources with different type of events ? Should the all 
sources be combined into one big table before operations with SQL CEP?

BR Esa

From: jincheng sun mailto:sunjincheng...@gmail.com>>
Sent: Thursday, April 4, 2019 1:05 PM
To: Esa Heikkinen (TAU) mailto:esa.heikki...@tuni.fi>>
Cc: user@flink.apache.org
Subject: Re: FlinkCEP and SQL?

Hi BR Esa,
CEP is available in Flink SQL, Please the detail here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sql.html#pattern-recognition
Best,
Jincheng

Esa Heikkinen (TAU) mailto:esa.heikki...@tuni.fi>> 
于2019年4月4日周四 下午4:44写道:

Hi



What is the situation of FlinkCEP and SQL?



Is it already possible to use SQL in CEP?



Is there any example cases where SQL is used in CEP?



BR Esa



Re: FlinkCEP and SQL?

2019-04-11 Thread Fabian Hueske
Hi Esa,

Flink's implementation of SQL MATCH_RECOGNIZE is based on it's CEP library,
i.e., they share the same implementation.

Best, Fabian

Am Do., 11. Apr. 2019 um 10:29 Uhr schrieb Esa Heikkinen (TAU) <
esa.heikki...@tuni.fi>:

> Hi
>
>
>
> Is SQL CEP based (old) FlinkCEP at all and are SQL CEP and FlinkCEP
> completely separate ?
>
>
>
> BR Esa
>
>
>
> *From:* Dian Fu 
> *Sent:* Thursday, April 4, 2019 2:37 PM
> *To:* Esa Heikkinen (TAU) 
> *Cc:* jincheng sun ; user@flink.apache.org
> *Subject:* Re: FlinkCEP and SQL?
>
>
>
> Should the all sources be combined into one big table before operations
> with SQL CEP?
>
>
>
> Yes, you should combine them into one table/stream.
>
>
>
> Regards,
>
> Dian
>
>
>
> 在 2019年4月4日,下午7:11,Esa Heikkinen (TAU)  写道:
>
>
>
> Hi
>
>
>
> Thank you for the information. How this SQL CEP is applicable for
> situation where there are many sources with different type of events ? Should
> the all sources be combined into one big table before operations with SQL
> CEP?
>
>
>
> BR Esa
>
>
>
> *From:* jincheng sun 
> *Sent:* Thursday, April 4, 2019 1:05 PM
> *To:* Esa Heikkinen (TAU) 
> *Cc:* user@flink.apache.org
> *Subject:* Re: FlinkCEP and SQL?
>
>
>
> Hi BR Esa,
>
> CEP is available in Flink SQL, Please the detail here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sql.html#pattern-recognition
>
> Best,
>
> Jincheng
>
>
>
> Esa Heikkinen (TAU)  于2019年4月4日周四 下午4:44写道:
>
> Hi
>
>
>
> What is the situation of FlinkCEP and SQL?
>
>
>
> Is it already possible to use SQL in CEP?
>
>
>
> Is there any example cases where SQL is used in CEP?
>
>
>
> BR Esa
>
>
>


RE: FlinkCEP and SQL?

2019-04-11 Thread Esa Heikkinen (TAU)
Hi

Is SQL CEP based (old) FlinkCEP at all and are SQL CEP and FlinkCEP completely 
separate ?

BR Esa

From: Dian Fu 
Sent: Thursday, April 4, 2019 2:37 PM
To: Esa Heikkinen (TAU) 
Cc: jincheng sun ; user@flink.apache.org
Subject: Re: FlinkCEP and SQL?

Should the all sources be combined into one big table before operations with 
SQL CEP?

Yes, you should combine them into one table/stream.

Regards,
Dian

在 2019年4月4日,下午7:11,Esa Heikkinen (TAU) 
mailto:esa.heikki...@tuni.fi>> 写道:

Hi

Thank you for the information. How this SQL CEP is applicable for situation 
where there are many sources with different type of events ? Should the all 
sources be combined into one big table before operations with SQL CEP?

BR Esa

From: jincheng sun mailto:sunjincheng...@gmail.com>>
Sent: Thursday, April 4, 2019 1:05 PM
To: Esa Heikkinen (TAU) mailto:esa.heikki...@tuni.fi>>
Cc: user@flink.apache.org
Subject: Re: FlinkCEP and SQL?

Hi BR Esa,
CEP is available in Flink SQL, Please the detail here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sql.html#pattern-recognition
Best,
Jincheng

Esa Heikkinen (TAU) mailto:esa.heikki...@tuni.fi>> 
于2019年4月4日周四 下午4:44写道:

Hi



What is the situation of FlinkCEP and SQL?



Is it already possible to use SQL in CEP?



Is there any example cases where SQL is used in CEP?



BR Esa



Re: Apache Flink - How to destroy global window and release it's resources

2019-04-11 Thread Fabian Hueske
Hi,

As far as I know, a window is only completely removed when time (event or
processing time, depending on the window type) passes the window's end
timestamp.
Since, GlobalWindow's end timestamp is Long.MAX_VALUE, it is never
completely removed.
I'm not 100% sure what state is kept around. It might not be keyed state
but just objects on the heap but not absolutely sure.

Aljoscha (in CC) should know the details here.

Best, Fabian

Am Do., 11. Apr. 2019 um 08:07 Uhr schrieb Guowei Ma :

> Hi,
> I think you could return a proper TriggerResult, which defines how to deal
> with the window elements after computing a window in your trigger
> implementation. You could find the detail information from the doc[1].
>
> 1.
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#fire-and-purge
> Best,
> Guowei
>
>
> M Singh  于2019年4月11日周四 上午1:42写道:
>
>> Hi:
>>
>> I have a use case where I need to create a global window where I need to
>> wait for unknown time for certain events for a particular key.  I
>> understand that I can create a global window and use a custom trigger to
>> initiate the function computation.  But I am not sure how to destroy the
>> window after the triggering conditions is satisfied and the the events are
>> purged.
>>
>> If there is any better way of dealing with this situation, please let me
>> know.
>>
>> Thanks
>>
>> Mans
>>
>


Re: Apache Flink - Question about broadcast state pattern usage

2019-04-11 Thread Fabian Hueske
Hi,

you would simply pass multiple MapStateDescriptors to the broadcast method:

MapStateDescriptor bcState1 = ...
MapStateDescriptor bcState2 = ...

DataStream stream = ...
BroadcastStream bcStream = stream.broadcast(bcState1, bcState2);

Best,
Fabian


Am Mi., 10. Apr. 2019 um 19:44 Uhr schrieb M Singh :

> Hi Guowei;
>
> Thanks for your answer.
>
> Do you have any example which illustrates using broadcast is used with
> multiple descriptors ?
>
> Thanks
>
>
>
> On Sunday, April 7, 2019, 10:10:15 PM EDT, Guowei Ma 
> wrote:
>
>
> Hi
> 1. I think you could use "Using Managed Operator State"[1]
> (context.getOperatorStateStore().getBroadcastState()) to use the
> BroadCastState.  But you must use it very carefully and guarantee the
> semantics of broadcast state yourself. I think "The Broadcast State
> Pattern"[2] is some best practice for using broadcast state.
> 2. The broadcast function is varargs. Since that you could pass multiple
> MapStateDescriptors to it.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#using-managed-operator-state
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/broadcast_state.html
>
> Best,
> Guowei
>
>
> M Singh  于2019年4月7日周日 下午10:17写道:
>
> Hi Flink folks:
>
> I am reading the documentation on broadcast state pattern (
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/broadcast_state.html)
> and have following questions:
>
> 1. Point number 2 - '2. it is only available to specific operators that
> have as inputs a *broadcasted* stream and a *non-broadcasted* one,'.
> From what I understand it can be used with connected streams.  Is there any
> other operator where it can be used ?
>
> 2. Point number 3 - '3. such an operator can have *multiple broadcast
> states* with different names.'.  Is there any additional
> documentation/example on how to implement/use multiple broadcast states ?
>
> Thanks
>
> Mans
>
>


Re: Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

2019-04-11 Thread Fabian Hueske
Hi Min,

I think the pool size is per parallel sink task, i.e., it should be
independent of the parallelism of the sink operator.
>From my understanding a pool size of 5 should be fine if the maximum number
of concurrent checkpoints is 1.
Running out of connections would mean that there are 5 in-flight
checkpoints that were not completed, which seems a lot to me (given that
the sink is probably at the end of the program).

If I remember correctly, Piotr (in CC) was working on the exactly-once
feature of the Kafka producer.
Maybe he can help.

Best,
Fabian

Am Mo., 8. Apr. 2019 um 14:43 Uhr schrieb :

> Hi,
>
>
>
> I keep getting exceptions
> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many
> ongoing snapshots. Increase kafka producers pool size or decrease number of
> concurrent checkpoints."
>
>
>
> I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to
> increase this size. What considerations should I take to increase this
> size? what is a size for a normal setting e.g. 32?
>
>
>
> I have a check point setting like this and run a parallelism of 16 and
> have a check point setting like this
>
>
>
> public static void setup(StreamExecutionEnvironment env) {
>
> env.enableCheckpointing(2_000);
>
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);
>
> env.getCheckpointConfig().setCheckpointTimeout(60_000);
>
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64));
>
>
> //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> }
>
>
>
> Regards,
>
>
>
> Min
>


Re: What is the best way to handle data skew processing in Data Stream applications?

2019-04-11 Thread Fabian Hueske
Hi Felipe,

three comments:

1) applying rebalance(), shuffle(), or rescale() before a keyBy() has no
effect:
keyBy() introduces a hash partitioning such that any data partitioning that
you do immediately before keyBy() is destroyed.
You only change the distribution for the call of the key extractor which
should be a lightweight function anyway.
That's why you do not see any difference between the three methods.

2) windowAll() defines a non-keyed window over the whole stream.
All records are processed by the same non-parallel instance of the window
operator.
That's why assigning a higher parallelism to that operator does not help.

3) One approach to improve the processing of skewed data, is to change how
keyed state is handled.
Flink's keyed state is partitioned in two steps:
1. each key is assigned to a key group based on an internal hash function.
2. each key group is assigned to and processed by a parallel operator task.
For full control over data placement, you need to control both.
Changing 1) is tricky because it affects savepoint compatibility.
Changing 2) does not help if two hot keys are assigned to the same keyed
state.

Best, Fabian

Am Mi., 10. Apr. 2019 um 11:50 Uhr schrieb Felipe Gutierrez <
felipe.o.gutier...@gmail.com>:

> Hi,
>
> I am studying data skew processing in Flink and how I can change the
> low-level control of physical partition (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning)
> in order to have an even processing of tuples. I have created synthetic
> skewed data sources and I aim to process (aggregate) them over a window.
> Here is the complete code:
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeyDAG.java#L61
>
> streamTrainsStation01.union(streamTrainsStation02)
> .union(streamTicketsStation01).union(streamTicketsStation02)
> // map the keys
> .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
> .rebalance() // or .rescale() .shuffle()
> .keyBy(new StationPlatformKeySelector())
> .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
> .apply(new
> StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
> .setParallelism(4)
> .map(new
> StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
> .addSink(new MqttStationPlatformPublisher(ipAddressSink,
> topic)).name(metricSinkFunction)
> ;
>
> According to the Flink dashboard I could not see too much difference among
> .shuffle(), .rescale(), and .rebalance(). Even though the documentation
> says rebalance() transformation is more suitable for data skew.
>
> After that I tried to use .partitionCustom(partitioner, "someKey").
> However, for my surprise, I could not use setParallelism(4) on the window
> operation. The documentation says "Note: This operation is inherently
> non-parallel since all elements have to pass through the same operator
> instance.". I did not understand why. If I am allowed to do partitionCustom
> why can't I use parallelism after that?
> Here is the complete code:
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedRescaleByKeyDAG.java#L74
>
> streamTrainsStation01.union(streamTrainsStation02)
> .union(streamTicketsStation01).union(streamTicketsStation02)
> // map the keys
> .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
> .partitionCustom(new StationPlatformKeyCustomPartitioner(), new
> StationPlatformKeySelector())
> .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
> .apply(new
> StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
> .map(new
> StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
> .addSink(new MqttStationPlatformPublisher(ipAddressSink,
> topic)).name(metricSinkFunction)
> ;
>
> Thanks,
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>


Re: Apache Flink - How to destroy global window and release it's resources

2019-04-11 Thread Guowei Ma
Hi,
I think you could return a proper TriggerResult, which defines how to deal
with the window elements after computing a window in your trigger
implementation. You could find the detail information from the doc[1].

1.
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#fire-and-purge
Best,
Guowei


M Singh  于2019年4月11日周四 上午1:42写道:

> Hi:
>
> I have a use case where I need to create a global window where I need to
> wait for unknown time for certain events for a particular key.  I
> understand that I can create a global window and use a custom trigger to
> initiate the function computation.  But I am not sure how to destroy the
> window after the triggering conditions is satisfied and the the events are
> purged.
>
> If there is any better way of dealing with this situation, please let me
> know.
>
> Thanks
>
> Mans
>