Re: REST API "broken" on YARN because POST is not allowed via YARN proxy

2018-05-30 Thread Juho Autio
Hi, I tried to search Flink Jira for this but couldn't find a ticket to
match. If there's no ticket yet, did I understand correctly though, that
you would be open to support GET as an alternative method for all of
Flink's REST operations?

Now that 1.5 was released, it's a pity that this problem kind of prevents
us from upgrading – or spend time on creating a workaround.

On Thu, Apr 5, 2018 at 9:10 PM, Till Rohrmann  wrote:

> This improvement is unfortunately out of scope for the 1.5 release since
> the feature freeze is already quite some time ago. But I hope that this
> improvement will make it into the 1.6 release.
>
> Cheers,
> Till
>
> On Thu, Apr 5, 2018 at 4:45 PM, Juho Autio  wrote:
>
>> Thanks for the answer. Wrapping with GET sounds good to me. You said next
>> version; do you mean that Flink 1.5 would already include this improvement
>> when it's released?
>>
>> On Thu, Apr 5, 2018 at 2:40 PM, Till Rohrmann 
>> wrote:
>>
>>> Hi Juho,
>>>
>>> you are right that due to a limitation in the Yarn proxy [1] we cannot
>>> directly contact the cluster through the Yarn proxy.
>>>
>>> The way it works at the moment is that the Flink client retrieves the
>>> AM's hostname through the ApplicationReport and then directly talks to the
>>> AM. This of course requires that one can reach the respective Yarn
>>> container. This is a limitation, though not a regression, which we want to
>>> improve with the next version of Flink. An idea would be to wrap the REST
>>> calls in a GET call to make them pass through the Yarn proxy.
>>>
>>> [1] https://issues.apache.org/jira/browse/YARN-2084
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Apr 4, 2018 at 4:31 PM, Fabian Hueske  wrote:
>>>
 Hi Juho,

 Thanks for raising this point!

 I'll add Chesnay and Till to the thread who contributed to the REST API.

 Best, Fabian

 2018-04-04 15:02 GMT+02:00 Juho Autio :

> I just learned that Flink savepoints API was refactored to require
> using HTTP POST.
>
> That's fine otherwise, but makes life harder when Flink is run on top
> of YARN.
>
> I've added example calls below to show how POST is declined by
> the hadoop-yarn-server-web-proxy*, which only supports GET and PUT.
>
> Can you think of any solution to this? If I would be able to determine
> the actual host & port for Flink UI, I could use that instead of the proxy
> address. But that would probably require opening at least one more port, 
> so
> it's not the optimal solution either. Ideally I would have Flink REST API
> completely accessible with GET and PUT methods.
>
> To me it seems like AWS EMR will also hit this issue as soon as they
> start supporting Flink 1.5, because they seem to run Flink as a YARN app.
>
>
> *) https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-p
> roject/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web
> -proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/
> WebAppProxyServlet.java#L296-L306
>
>
> $ http POST http://10.0.10.71:20888/proxy/
> application_1522844153347_0001/jobs/652e207f8578574d4a322e23
> d4f8b908/checkpoints
>
> HTTP/1.1 405 HTTP method POST is not supported by this URL
> Cache-Control: must-revalidate,no-cache,no-store
> Content-Length: 1523
> Content-Type: text/html; charset=iso-8859-1
> Date: Wed, 04 Apr 2018 12:48:05 GMT
> Date: Wed, 04 Apr 2018 12:48:05 GMT
> Pragma: no-cache
> Pragma: no-cache
>
> 
> 
> 
> Error 405 HTTP method POST is not supported by this URL
> 
> HTTP ERROR 405
> Problem accessing /proxy/application_15228441533
> 47_0001/jobs/652e207f8578574d4a322e23d4f8b908/checkpoints. Reason:
> HTTP method POST is not supported by this URL />Powered by Jetty://
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>
> 
> 
>
>
> $ http PUT http://10.0.10.71:20888/proxy/
> application_1522844153347_0001/jobs/652e207f8578574d4a322e23
> d4f8b908/checkpoints
>
> HTTP/1.1 404 Not Found
> Access-Control-Allow-Origin: *
> Cache-Control: no-cache
> Content-Length: 25
> Content-Type: application/json; charset=UTF-8
> Date: Wed, 04 Apr 2018 12:48:09 GMT
> Date: Wed, 04 Apr 2018 12:48:09 GMT
> Expires: Wed, 04 Apr 2018 12:48:09 GMT
> Expires: Wed, 04 Apr 2018 12:48:09 GMT
> Pragma: no-cache
> Pragma: no-cache
>
> {
> "errors": [
> "Not found."
> ]
> }
>
> ^ expected to get this from flink because there's no PUT /checkpoints.
>


>>>
>>
>


Re: NPE in flink sql over-window

2018-05-30 Thread Dawid Wysakowicz
Hi Yan,


I think it is a bug in the ProcTimeBoundedRangeOver. It tries to access
a list of elements that was already cleared and does not check against
null. Could you please file a JIRA for that?


Best,

Dawid


On 30/05/18 08:27, Yan Zhou [FDS Science] wrote:
>
> I also get warnning that CodeCache is full around that time. It's
> printed by JVM and doesn't have timestamp. But I suspect that it's
> because so many failure recoveries from checkpoint and the sql queries
> are dynamically compiled too many times.
>
>
>
> /Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full.
> Compiler has been disabled./
> /Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code
> cache size using -XX:ReservedCodeCacheSize=/
> /CodeCache: size=245760Kb used=244114Kb max_used=244146Kb free=1645Kb/
> /bounds [0x7fa4fd00, 0x7fa50c00, 0x7fa50c00]/
> /total_blobs=54308 nmethods=53551 adapters=617/
> /compilation: disabled (not enough contiguous free space left)/
>
>
>
> 
> *From:* Yan Zhou [FDS Science] 
> *Sent:* Tuesday, May 29, 2018 10:52:18 PM
> *To:* user@flink.apache.org
> *Subject:* NPE in flink sql over-window
>  
>
> Hi,
>
> I am using flink sql 1.5.0. My application throws NPE. And after it
> recover from checkpoint automatically, it throws NPE immediately from
> same line of code. 
>
>
> My application read message from kafka, convert the datastream into a
> table, issue an Over-window aggregation and write the result into a
> sink. NPE throws from class ProcTimeBoundedRangeOver. Please see
> exception log at the bottom.
>
>
> The exceptions always happens after the application started
> for /maxIdleStateRetentionTime /time.  What could be the possible causes? 
>
>
> Best
>
> Yan
>
>
> /2018-05-27 11:03:37,656 INFO 
> org.apache.flink.runtime.taskmanager.Task                     - over:
> (PARTITION BY: uid, ORDER BY: proctime, RANGEBETWEEN 8640 PRECEDI/
> /NG AND CURRENT ROW, select: (id, uid, proctime, group_concat($7) AS
> w0$o0)) -> select: /
> /(id, uid, proctime, w0$o0 AS EXPR$3) -> to: Row -> Flat Map -> Filter
> -> Sink: Unnamed (3/15) (327/
> /efe96243bbfdf1f1e40a3372f64aa) switched from RUNNING to FAILED./
> /TimerException{java.lang.NullPointerException}/
> /       at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)/
> /       at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)/
> /       at java.util.concurrent.FutureTask.run(FutureTask.java:266)/
> /       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)/
> /       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)/
> /       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)/
> /       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)/
> /       at java.lang.Thread.run(Thread.java:748)/
> /Caused by: java.lang.NullPointerException/
> /       at
> org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRangeOverWithLog.onTimer(ProcTimeBoundedRangeOver.scala:181)/
> /       at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)/
> /       at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onProcessingTime(LegacyKeyedProcessOperator.java:81)/
> /       at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:266)/
> /       at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)/
>
>
>



signature.asc
Description: OpenPGP digital signature


Re: Batch job stuck in Canceled state in Flink 1.5

2018-05-30 Thread Till Rohrmann
Great to hear :-)

On Tue, May 29, 2018 at 4:56 PM, Amit Jain  wrote:

> Thanks Till. `taskmanager.network.request-backoff.max` option helped in
> my case.  We tried this on 1.5.0 and jobs are running fine.
>
>
> --
> Thanks
> Amit
>
> On Thu 24 May, 2018, 4:58 PM Amit Jain,  wrote:
>
>> Thanks! Till. I'll give a try on your suggestions and update the thread.
>>
>> On Wed, May 23, 2018 at 4:43 AM, Till Rohrmann 
>> wrote:
>> > Hi Amit,
>> >
>> > it looks as if the current cancellation cause is not the same as the
>> > initially reported cancellation cause. In the current case, it looks as
>> if
>> > the deployment of your tasks takes so long that that maximum
>> > `taskmanager.network.request-backoff.max` value has been reached. When
>> this
>> > happens a task gives up asking for the input result partition and fails
>> with
>> > the `PartitionNotFoundException`.
>> >
>> > More concretely, the `CHAIN Reduce (GroupReduce at
>> > first(SortedGrouping.java:210)) -> Map (Key Extractor) (2/14)` cannot
>> > retrieve the result partition of the `CHAIN DataSource (at
>> > createInput(ExecutionEnvironment.java:548)
>> > (org.apache.flink.api.java.io.TextInputFormat)) -> Map (Data Source
>> > org.apache.flink.api.java.io.TextInputFormat
>> > [s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/
>> redshift_logs/2018/5/20/14/,
>> > s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/
>> redshift_logs/2018/5/20/15/0/])
>> > -> Map (Key Extractor) -> Combine (GroupReduce at
>> > first(SortedGrouping.java:210)) (8/14)` task. This tasks is in the
>> state
>> > deploying when the exception occurs. It seems to me that this task takes
>> > quite some time to be deployed.
>> >
>> > One reason why the deployment could take some time is that an UDF (for
>> > example the closure) of one of the operators is quite large. If this is
>> the
>> > case, then Flink offloads the corresponding data onto the BlobServer
>> from
>> > where they are retrieved by the TaskManagers. Since you are running in
>> > non-HA mode, the BlobServer won't store the blobs on HDFS from where
>> they
>> > could be retrieved. Instead all the TaskManagers ask the single
>> BlobServer
>> > for the required TDD blobs. Depending on the size of the TDDs, the
>> > BlobServer might become the bottleneck.
>> >
>> > What you can try to do is the following
>> > 1) Try to reduce the closure object of the UDFs in the above-mentioned
>> task.
>> > 2) Increase `taskmanager.network.request-backoff.max` to give the
>> system
>> > more time to download the blobs
>> > 3) Run the cluster in HA mode which will store the blobs also under
>> > `high-availability.storageDir` (usually HDFS or S3). Before downloading
>> the
>> > blobs from the BlobServer, Flink will first try to download them from
>> the
>> > `high-availability-storageDir`
>> >
>> > Let me know if this solves your problem.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Tue, May 22, 2018 at 1:29 PM, Amit Jain  wrote:
>> >>
>> >> Hi Nico,
>> >>
>> >> Please find the attachment for more logs.
>> >>
>> >> --
>> >> Thanks,
>> >> Amit
>> >>
>> >> On Tue, May 22, 2018 at 4:09 PM, Nico Kruber 
>> >> wrote:
>> >> > Hi Amit,
>> >> > thanks for providing the logs, I'll look into it. We currently have a
>> >> > suspicion of this being caused by
>> >> > https://issues.apache.org/jira/browse/FLINK-9406 which we found by
>> >> > looking over the surrounding code. The RC4 has been cancelled since
>> we
>> >> > see this as a release blocker.
>> >> >
>> >> > To rule out further errors, can you also provide logs for the task
>> >> > manager producing partitions d6946b39439f10e8189322becf1b8887,
>> >> > 9aa35dd53561f9220b7b1bad84309d5f and 60909dec9134eb980e18064358dc2c
>> 81?
>> >> > The task manager log you provided covers the task manager asking for
>> >> > this partition only for which the job manager produces the
>> >> > PartitionProducerDisposedException that you see.
>> >> > I'm looking for the logs of task managers with the following
>> execution
>> >> > IDs in their logs:
>> >> > - 2826f9d430e05e9defaa46f60292fa79
>> >> > - 7ef992a067881a07409819e3aea32004
>> >> > - ec923ce6d891d89cf6fecb5e3f5b7cc5
>> >> >
>> >> > Regarding the operators being stuck: I'll have a further look into
>> the
>> >> > logs and state transition and will come back to you.
>> >> >
>> >> >
>> >> > Nico
>> >> >
>> >> >
>> >> > On 21/05/18 09:27, Amit Jain wrote:
>> >> >> Hi All,
>> >> >>
>> >> >> I totally missed this thread. I've encountered same issue in Flink
>> >> >> 1.5.0 RC4. Please look over the attached logs of JM and impacted TM.
>> >> >>
>> >> >> Job ID 390a96eaae733f8e2f12fc6c49b26b8b
>> >> >>
>> >> >> --
>> >> >> Thanks,
>> >> >> Amit
>> >> >>
>> >> >> On Thu, May 3, 2018 at 8:31 PM, Nico Kruber > >
>> >> >> wrote:
>> >> >>> Also, please have a look at the other TaskManagers' logs, in
>> >> >>> particular
>> >> >>> the one that is running the operator that was mentioned in the
>> >> >>> exception. You should look out for the ID
>> >> >>> 98f59767

Re: REST API "broken" on YARN because POST is not allowed via YARN proxy

2018-05-30 Thread Till Rohrmann
Hi Juho,

I created a JIRA issue for the problem [1]. Yes, the solution would be to
use only GET requests. Either we wrap our requests in a GET request or we
change our handlers to accept GET requests only. One thing which we have to
fix first is that also the jar file upload goes through REST.

[1] https://issues.apache.org/jira/browse/FLINK-9478

Cheers,
Till

On Wed, May 30, 2018 at 9:07 AM, Juho Autio  wrote:

> Hi, I tried to search Flink Jira for this but couldn't find a ticket to
> match. If there's no ticket yet, did I understand correctly though, that
> you would be open to support GET as an alternative method for all of
> Flink's REST operations?
>
> Now that 1.5 was released, it's a pity that this problem kind of prevents
> us from upgrading – or spend time on creating a workaround.
>
>
> On Thu, Apr 5, 2018 at 9:10 PM, Till Rohrmann 
> wrote:
>
>> This improvement is unfortunately out of scope for the 1.5 release since
>> the feature freeze is already quite some time ago. But I hope that this
>> improvement will make it into the 1.6 release.
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 5, 2018 at 4:45 PM, Juho Autio  wrote:
>>
>>> Thanks for the answer. Wrapping with GET sounds good to me. You said
>>> next version; do you mean that Flink 1.5 would already include this
>>> improvement when it's released?
>>>
>>> On Thu, Apr 5, 2018 at 2:40 PM, Till Rohrmann 
>>> wrote:
>>>
 Hi Juho,

 you are right that due to a limitation in the Yarn proxy [1] we cannot
 directly contact the cluster through the Yarn proxy.

 The way it works at the moment is that the Flink client retrieves the
 AM's hostname through the ApplicationReport and then directly talks to the
 AM. This of course requires that one can reach the respective Yarn
 container. This is a limitation, though not a regression, which we want to
 improve with the next version of Flink. An idea would be to wrap the REST
 calls in a GET call to make them pass through the Yarn proxy.

 [1] https://issues.apache.org/jira/browse/YARN-2084

 Cheers,
 Till

 On Wed, Apr 4, 2018 at 4:31 PM, Fabian Hueske 
 wrote:

> Hi Juho,
>
> Thanks for raising this point!
>
> I'll add Chesnay and Till to the thread who contributed to the REST
> API.
>
> Best, Fabian
>
> 2018-04-04 15:02 GMT+02:00 Juho Autio :
>
>> I just learned that Flink savepoints API was refactored to require
>> using HTTP POST.
>>
>> That's fine otherwise, but makes life harder when Flink is run on top
>> of YARN.
>>
>> I've added example calls below to show how POST is declined by
>> the hadoop-yarn-server-web-proxy*, which only supports GET and PUT.
>>
>> Can you think of any solution to this? If I would be able to
>> determine the actual host & port for Flink UI, I could use that instead 
>> of
>> the proxy address. But that would probably require opening at least one
>> more port, so it's not the optimal solution either. Ideally I would have
>> Flink REST API completely accessible with GET and PUT methods.
>>
>> To me it seems like AWS EMR will also hit this issue as soon as they
>> start supporting Flink 1.5, because they seem to run Flink as a YARN app.
>>
>>
>> *) https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-p
>> roject/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web
>> -proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/
>> WebAppProxyServlet.java#L296-L306
>>
>>
>> $ http POST http://10.0.10.71:20888/proxy/
>> application_1522844153347_0001/jobs/652e207f8578574d4a322e23
>> d4f8b908/checkpoints
>>
>> HTTP/1.1 405 HTTP method POST is not supported by this URL
>> Cache-Control: must-revalidate,no-cache,no-store
>> Content-Length: 1523
>> Content-Type: text/html; charset=iso-8859-1
>> Date: Wed, 04 Apr 2018 12:48:05 GMT
>> Date: Wed, 04 Apr 2018 12:48:05 GMT
>> Pragma: no-cache
>> Pragma: no-cache
>>
>> 
>> 
>> 
>> Error 405 HTTP method POST is not supported by this URL
>> 
>> HTTP ERROR 405
>> Problem accessing /proxy/application_15228441533
>> 47_0001/jobs/652e207f8578574d4a322e23d4f8b908/checkpoints. Reason:
>> HTTP method POST is not supported by this URL> />Powered by Jetty://
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>
>> 
>> 
>>
>>
>> $ http PUT http://10.0.10.71:20888/proxy/
>> application_1522844153347_0001/jobs/652e207f8578574d4a322e23
>> d4f8b908/checkpoints
>>
>> HTTP/1.1 404 Not Found
>> Access-Control-Allow-Origin: *
>> Cache-Control: no-cache
>> Content-Length: 25
>> Content-Type: application/json; charset=UTF-8
>> Date: Wed, 04 Apr 2018 12:48:09 GMT
>> Date: Wed, 04

Re: REST API "broken" on YARN because POST is not allowed via YARN proxy

2018-05-30 Thread Juho Autio
Thanks, Till!

On Wed, May 30, 2018 at 10:39 AM, Till Rohrmann 
wrote:

> Hi Juho,
>
> I created a JIRA issue for the problem [1]. Yes, the solution would be to
> use only GET requests. Either we wrap our requests in a GET request or we
> change our handlers to accept GET requests only. One thing which we have to
> fix first is that also the jar file upload goes through REST.
>
> [1] https://issues.apache.org/jira/browse/FLINK-9478
>
> Cheers,
> Till
>
> On Wed, May 30, 2018 at 9:07 AM, Juho Autio  wrote:
>
>> Hi, I tried to search Flink Jira for this but couldn't find a ticket to
>> match. If there's no ticket yet, did I understand correctly though, that
>> you would be open to support GET as an alternative method for all of
>> Flink's REST operations?
>>
>> Now that 1.5 was released, it's a pity that this problem kind of prevents
>> us from upgrading – or spend time on creating a workaround.
>>
>>
>> On Thu, Apr 5, 2018 at 9:10 PM, Till Rohrmann 
>> wrote:
>>
>>> This improvement is unfortunately out of scope for the 1.5 release since
>>> the feature freeze is already quite some time ago. But I hope that this
>>> improvement will make it into the 1.6 release.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Apr 5, 2018 at 4:45 PM, Juho Autio  wrote:
>>>
 Thanks for the answer. Wrapping with GET sounds good to me. You said
 next version; do you mean that Flink 1.5 would already include this
 improvement when it's released?

 On Thu, Apr 5, 2018 at 2:40 PM, Till Rohrmann 
 wrote:

> Hi Juho,
>
> you are right that due to a limitation in the Yarn proxy [1] we cannot
> directly contact the cluster through the Yarn proxy.
>
> The way it works at the moment is that the Flink client retrieves the
> AM's hostname through the ApplicationReport and then directly talks to the
> AM. This of course requires that one can reach the respective Yarn
> container. This is a limitation, though not a regression, which we want to
> improve with the next version of Flink. An idea would be to wrap the REST
> calls in a GET call to make them pass through the Yarn proxy.
>
> [1] https://issues.apache.org/jira/browse/YARN-2084
>
> Cheers,
> Till
>
> On Wed, Apr 4, 2018 at 4:31 PM, Fabian Hueske 
> wrote:
>
>> Hi Juho,
>>
>> Thanks for raising this point!
>>
>> I'll add Chesnay and Till to the thread who contributed to the REST
>> API.
>>
>> Best, Fabian
>>
>> 2018-04-04 15:02 GMT+02:00 Juho Autio :
>>
>>> I just learned that Flink savepoints API was refactored to require
>>> using HTTP POST.
>>>
>>> That's fine otherwise, but makes life harder when Flink is run on
>>> top of YARN.
>>>
>>> I've added example calls below to show how POST is declined by
>>> the hadoop-yarn-server-web-proxy*, which only supports GET and PUT.
>>>
>>> Can you think of any solution to this? If I would be able to
>>> determine the actual host & port for Flink UI, I could use that instead 
>>> of
>>> the proxy address. But that would probably require opening at least one
>>> more port, so it's not the optimal solution either. Ideally I would have
>>> Flink REST API completely accessible with GET and PUT methods.
>>>
>>> To me it seems like AWS EMR will also hit this issue as soon as they
>>> start supporting Flink 1.5, because they seem to run Flink as a YARN 
>>> app.
>>>
>>>
>>> *) https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-p
>>> roject/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web
>>> -proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/
>>> WebAppProxyServlet.java#L296-L306
>>>
>>>
>>> $ http POST http://10.0.10.71:20888/proxy/
>>> application_1522844153347_0001/jobs/652e207f8578574d4a322e23
>>> d4f8b908/checkpoints
>>>
>>> HTTP/1.1 405 HTTP method POST is not supported by this URL
>>> Cache-Control: must-revalidate,no-cache,no-store
>>> Content-Length: 1523
>>> Content-Type: text/html; charset=iso-8859-1
>>> Date: Wed, 04 Apr 2018 12:48:05 GMT
>>> Date: Wed, 04 Apr 2018 12:48:05 GMT
>>> Pragma: no-cache
>>> Pragma: no-cache
>>>
>>> 
>>> 
>>> 
>>> Error 405 HTTP method POST is not supported by this
>>> URL
>>> 
>>> HTTP ERROR 405
>>> Problem accessing /proxy/application_15228441533
>>> 47_0001/jobs/652e207f8578574d4a322e23d4f8b908/checkpoints. Reason:
>>> HTTP method POST is not supported by this URL>> />Powered by Jetty://
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>>
>>> 
>>> 
>>>
>>>
>>> $ http PUT http://10.0.10.71:20888/proxy/
>>> application_1522844153347_0001/jobs/652e207f8578574d4a322e23
>>> d4f8b908/checkpoints
>

Re: Build Cassandra Connector with some customization

2018-05-30 Thread Chesnay Schepler

directory: flink-connectors/flink-connector-cassandra
command: mvn package -Ddriver.version=3.1.4 -Dguava.version=16.0.1

On 29.05.2018 14:41, Soheil Pourbafrani wrote:

I want to build Flink Cassandra connector against
datastax version 3.1.4
guava 16.0.1

using what command I can do that? and in what Flink source directory?





Re: NPE in flink sql over-window

2018-05-30 Thread Fabian Hueske
Hi,

Dawid's analysis is certainly correct, but looking at the code this should
not happen.

I have a few questions:
- You said this only happens if you configure idle state retention times,
right?
- Does the error occur the first time without a previous recovery?
- Did you run the same query on Flink 1.4.x without any problems?

Thanks, Fabian

2018-05-30 9:25 GMT+02:00 Dawid Wysakowicz :

> Hi Yan,
>
>
> I think it is a bug in the ProcTimeBoundedRangeOver. It tries to access a
> list of elements that was already cleared and does not check against null.
> Could you please file a JIRA for that?
>
>
> Best,
>
> Dawid
>
> On 30/05/18 08:27, Yan Zhou [FDS Science] wrote:
>
> I also get warnning that CodeCache is full around that time. It's printed
> by JVM and doesn't have timestamp. But I suspect that it's because so
> many failure recoveries from checkpoint and the sql queries are dynamically
> compiled too many times.
>
>
>
> *Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler
> has been disabled.*
> *Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache
> size using -XX:ReservedCodeCacheSize=*
> *CodeCache: size=245760Kb used=244114Kb max_used=244146Kb free=1645Kb*
> *bounds [0x7fa4fd00, 0x7fa50c00, 0x7fa50c00]*
> *total_blobs=54308 nmethods=53551 adapters=617*
> *compilation: disabled (not enough contiguous free space left)*
>
>
>
> --
> *From:* Yan Zhou [FDS Science]  
> *Sent:* Tuesday, May 29, 2018 10:52:18 PM
> *To:* user@flink.apache.org
> *Subject:* NPE in flink sql over-window
>
>
> Hi,
>
> I am using flink sql 1.5.0. My application throws NPE. And after it
> recover from checkpoint automatically, it throws NPE immediately from same
> line of code.
>
>
> My application read message from kafka, convert the datastream into a
> table, issue an Over-window aggregation and write the result into a sink.
> NPE throws from class ProcTimeBoundedRangeOver. Please see exception log
> at the bottom.
>
>
> The exceptions always happens after the application started for 
> *maxIdleStateRetentionTime
> *time.  What could be the possible causes?
>
>
> Best
>
> Yan
>
>
> *2018-05-27 11:03:37,656 INFO  org.apache.flink.runtime.taskmanager.Task
>- over: (PARTITION BY: uid, ORDER BY: proctime,
> RANGEBETWEEN 8640 PRECEDI*
> *NG AND CURRENT ROW, select: (id, uid, proctime, group_concat($7) AS
> w0$o0)) -> select: *
> *(id, uid, proctime, w0$o0 AS EXPR$3) -> to: Row -> Flat Map -> Filter ->
> Sink: Unnamed (3/15) (327*
> *efe96243bbfdf1f1e40a3372f64aa) switched from RUNNING to FAILED.*
> *TimerException{java.lang.NullPointerException}*
> *   at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)*
> *   at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
> *   at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
> *   at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)*
> *   at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)*
> *   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
> *   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
> *   at java.lang.Thread.run(Thread.java:748)*
> *Caused by: java.lang.NullPointerException*
> *   at
> org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRangeOverWithLog.onTimer(ProcTimeBoundedRangeOver.scala:181)*
> *   at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)*
> *   at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onProcessingTime(LegacyKeyedProcessOperator.java:81)*
> *   at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:266)*
> *   at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)*
>
>
>
>
>


RE: env.execute() ?

2018-05-30 Thread Esa Heikkinen
Hi

Ok. Thanks for the clarification. But the controlling of savepoints is only 
possible by command line (or a script) ? Or is it possible to do internally in 
sync with application ?

Esa

From: Shuyi Chen 
Sent: Wednesday, May 30, 2018 8:18 AM
To: Esa Heikkinen 
Cc: Fabian Hueske ; user@flink.apache.org
Subject: Re: env.execute() ?

Hi Esa,

I think having more than one env.execute() is anti-pattern in Flink.

env.execute() behaves differently depending on the env. For local, it will 
generate the flink job graph, and start a local mini cluster in background to 
run the job graph directly.
For remote case, it will generate the flink job graph and submit it to a remote 
cluster, e.g. running on YARN/Mesos, the local process might stay attached or 
detach to the job on the remote cluster given options. So it's not a simple 
"unstoppable forever loop", and I dont think the "stop env.execute() and then 
do something and after that restart it" will work in general.

But I think you can take a look at savepoints [1] and checkpoints [2] in Flink. 
With savepoints, you can stop the running job, and do something else, and 
restart from the savepoints to resume the processing.


[1]  
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/savepoints.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/checkpoints.html

Thanks
Shuyi

On Tue, May 29, 2018 at 3:56 AM, Esa Heikkinen 
mailto:esa.heikki...@student.tut.fi>> wrote:
Hi

Are there only one env.execute() in application ?

Is it unstoppable forever loop ?

Or can I stop env.execute() and then do something and after that restart it ?

Best, Esa

From: Fabian Hueske mailto:fhue...@gmail.com>>
Sent: Tuesday, May 29, 2018 1:35 PM
To: Esa Heikkinen 
mailto:esa.heikki...@student.tut.fi>>
Cc: user@flink.apache.org
Subject: Re: env.execute() ?

Hi,

It is mandatory for all DataStream programs and most DataSet programs.

Exceptions are ExecutionEnvironment.print() and ExecutionEnvironment.collect().
Both methods are defined on the DataSet ExecutionEnvironment and call execute() 
internally.

Best, Fabian

2018-05-29 12:31 GMT+02:00 Esa Heikkinen 
mailto:esa.heikki...@student.tut.fi>>:
Hi

Is it env.execute() mandatory at the end of application ? It is possible to run 
the application without it ?

I found some examples where it is missing.

Best, Esa




--
"So you have to trust that the dots will somehow connect in your future."


Re: Task did not exit gracefully and lost TaskManager

2018-05-30 Thread makeyang
met the same problem in 1.4
when I cancel job, one of taskmanager keep logging the exception 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


[ANNOUNCE] Weekly community update #22

2018-05-30 Thread Till Rohrmann
Dear community,

I know it has been quite some time but here is the weekly community
update thread
#22. Please post any news and updates you want to share with the community to
this thread.

# Release 1.5

After a lot of work and good amount of testing, the community has finally
released Apache Flink 1.5.0. The new release contains a lot of new features
like a reworked deployment model, support for broadcast state, SQL CLI,
full resource elasticity, faster recovery and an enhanced network stack to
support higher throughput and lower latencies and many more [1].

# New committers

In the past weeks the Flink community could gain a couple of new
committers. The community is proud that Xingcan Cui, Nico Kruber and Shuyi
Chen are helping to stack up the committer ranks.

# On going discussions

## Enable GitBox integration for Flink's repository

In order to give the community a bit more tooling around it's Github
mirror, the community decided to activate the GitBox integration [2]. That
way, PR management and reviewing will hopefully be facilitated.

## Improving Flink's timer management

An effort to improve Flink's timer management has been started [3]. The
goal is to make Flink timers scalable and asynchronously checkpointable.

## Adding Elastic Filter for record deduplication

The community started a discussion about adding an approximative
deduplication operator based on filters (e.g. Bloom, Cuckoo, etc.) [4].
Having an approximate filter would allow to speed up certain operations.

# Flink Forward Berlin 2018 call for presentations

The call for presentations for Flink Forward Berlin 2018 [5] closes on June
4, 11:59 PM CEST. So if you have an interesting topic to share with the
community, then please submit a proposal.

[1] http://flink.apache.org/news/2018/05/25/release-1.5.0.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Enable-GitBox-integration-2-td22404.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/PROPOSAL-Improving-Flink-s-timer-management-for-large-state-td22491.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/PROPOSAL-Introduce-Elastic-Bloom-Filter-For-Flink-td22430.html
[5] https://flink-forward.org/call-for-presentations-submit-talk/

Cheers,
Till


ML in Streaming API

2018-05-30 Thread Thodoris Bitsakis
Hello and thanks for the subscription!

I am using Streaming API to develop a ML algorithm and i would like your
opinions regarding the following issues: 

*1)* The input is read from a big size file with d-dimensional points, and i
want to perform a parallel count window. In each parallel count window, i
want to perform a function that maintains a list of buckets in memory in
order to be checkpointed(exploiting state feature) . For every parallel
count window some(0*)! of the buckets will be updated or deleted.

*My thoughts: *
As there is no logical key and there is no parallel countWinowAll, the
correct way is to perform a parallel flatmap operator? But then i assume
that i must implement a custom buffering of input data using ListState to
implement the countwindow? Also i could use again another ListState to
maintain the list of buckets in memory. But then every time i want to update
a specific buffer of the listState i must clear the ListState and reinsert
all buffers again(not Optimal for big buffers)? 

The other way is to use a deterministic pseudo-key and use
keyby.countwindow. The number of different keys will be the number of
parallelism. In order to update some of the buckets for every key (parallel
instance) i am considering the use of mapState(UK=Bucket index,UV= Bucket
elements). In that case i think the use of pseudo-key is not the best
technique? and also i am going to use unnecessary data shuffle (keyby)?

*What is the best way? Or is there another way to solve the previous
issues?*

*2)*When there is no more input data (EOF) or when a user “asks” for a
part-evaluation of the ML algorithm through an external source, i want to
collect the list of buckets from the parallel operator instances to another
reduce-style operator with parallelism 1 to find the final list (classic
scenario of map-reduce). When there is no user query or EOF, I don't want
the parallel operator instances to emit anything.

*My thoughts:* *I don't know how the user will “ask” the flink parallel
operator instances  (parallel count window) to emit their results to the
downstream operator of parallelism 1.* I don't know how the operator
instances will know that the file ended (if i use keyby.countwindow i can
use a custom trigger with timer? Else in flatmap case? )

The concept is that the list of buckets in each parallel operator instance
is a local Sketch and i want to collect the local sketches when the user
“asks” to calculate the final Sketch (*See the following diagram*).

Any thoughts are very much appreciated!!! Thank you in advance.

 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink and AWS S3 integration: java.lang.NullPointerException: null uri host

2018-05-30 Thread Fabian Wollert
Hi, I'm trying to set up Checkpoints for Flink Jobs with S3 as a filesystem
backend. I configured the following:

state.backend=filesystem
state.backend.fs.checkpointdir=s3:///mybucket/
state.checkpoints.dir=s3:///mybucket/
state.checkpoints.num-retained=3

I also copied the flink-s3-fs-hadoop-1.5.0.jar into the lib folder.

I get now though the following error message:

Caused by: java.lang.NullPointerException: null uri host.
at java.util.Objects.requireNonNull(Objects.java:228)
at
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:65)
at
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:165)
at
org.apache.flink.fs.s3hadoop.S3FileSystemFactory.create(S3FileSystemFactory.java:133)

I tried to dig deeper into the source code, but struggled to find

   - what is meant with this URI
   - where to configure it

Can anybody give some advice how to set up the S3 Backend with the new
shaded lib jar?

Thanks in advance
--


*Fabian WollertZalando SE*

E-Mail: fabian.woll...@zalando.de

Tamara-Danz-Straße 1
10243 Berlin
Fax: +49 (0)30 2759 46 93
E-mail: legalnot...@zalando.co.uk
Notifications of major holdings (Sec. 33, 38, 39 WpHG):  +49 (0)30
2000889349

Management Board:
Robert Gentz, David Schneider, Rubin Ritter

Chairman of the Supervisory Board:
Lothar Lanz

Person responsible for providing the contents of Zalando SE acc. to Art. 55
RStV [Interstate Broadcasting Agreement]: Rubin Ritter
Registered at the Local Court Charlottenburg Berlin, HRB 158855 B
VAT registration number: DE 260543043


Use element of the DataStream in parameter of RichMapFunction (open function not called)

2018-05-30 Thread Robin, Isabelle
Hello,

I'm working with Flink 1.4.2 (Scala API) and I'm having some trouble with my 
custom RichMapFunction as I want the element in my Datastream to also be used 
for a parameter of this custom class. My RichMapFunction is a simple counter 
based on a MapState

Let's say I have those classes

-  case class Feature(transaction: Transaction) { override def getKey: 
(String, String) = ... }

-  class CustomMapFunction(featureKey: (String, String)) extends 
RichMapFunction[Transaction, Transaction]

I implemented my custom map function with the needed functions but I 
encountered different issues as I tried several solutions for this. In the 
following chunks of code, stream is a DataStream[Transaction] and I expect a 
DataStream[Transaction] as output type too


* stream.keyBy(transaction => 
Feature(transaction).getKey).map(transaction => new 
CustomMapFunction(Feature(transaction).getKey))

o   this leads to a compilation error ("Expression of type CustomMapFunction 
doesn't conform to expected type R_"), which, as far as I understand, should 
come from the fact I'm already using transaction for the Feature(transaction) 
part

* stream.keyBy(transaction => 
Feature(transaction).getKey).map(transaction => new 
CustomMapFunction(Feature(transaction).getKey).map(transaction))

o   compiles but fails with a NullPointerException at runtime as the MapState 
is not initialized. When running with debugger the open function was not used 
which leads the MapState to stay null (I don't have this problem with a more 
simple version of my CustomMapFunction which does not need this parameter based 
on the transaction)

Do you have an idea of how I could solve this issue ?

Thanks in advance for any help and I hope I was clear enough (that's my first 
question on the mailing list, don't hesitate to say if I forgot some steps or 
elements :))

Best regards,

Isabelle


Re: TimerService/Watermarks and Checkpoints

2018-05-30 Thread Narayanan Arunachalam
Thanks Sihua. If it's reset to Long.MIN_VALUE I can't explain why
outOfOrderEvents are reported. Because the event time on the data will
always be greater than Long.MIN_VALUE.

Following are the steps to reproduce this scenario.
- A source to produce events with timestamps that is increasing for every
event produced
- Use TimeCharacteristic.EventTime
- Use BoundedOutOfOrdernessTimestampExtractor with maxOutOfOrderness set to
60s.
- Enable checkpoints
- ProcessFunction impl to report a counter to some metrics backend when the
timestamp of the event is less than currentWatermark
- No out of order events will be reported initially. After few checkpoints
are created, cancel and restart the job from a previous checkpoint.

*Note*: The event stream really doesn't have out of order data. Job restart
from a checkpoint causes this artificial out of order events because of the
watermark value.

Regards,
Nara




On Tue, May 29, 2018 at 7:54 PM, sihua zhou  wrote:

> Hi Nara,
>
> yes, the watermark in TimerService is not covered by the checkpoint,
> everytime the job is restarted from a previous checkpoint, it is reset to
> Long.MIN_VALUE. I can see it a bit tricky to cover it into the checkpoint,
> especially when we need to support rescaling(it seems not like a purely
> keyed or a operate state), maybe @Stefan or @Aljoscha could give you more
> useful information about why it wasn't covered by the checkpoint.
>
> Best, Sihua
>
>
>
> On 05/30/2018 05:44,Narayanan Arunachalam
>  wrote:
>
> Hi,
>
> Is it possible the watermark in TimerService not getting reset when a job
> is restarted from a previous checkpoint? I would expect the watermark in a
> TimerService also to go back in time.
>
> I have the following ProcessFunction implementation.
>
>   override def processElement(
> e: TraceEvent,
> ctx: ProcessFunction[
>   TraceEvent,
>   Trace
> ]#Context,
> out: Collector[Trace]
>   ): Unit = {
>
> if (e.getAnnotationTime() < ctx.timerService().currentWatermark()) {
>   registry.counter("tracing.outOfOrderEvents").increment()
> } else {
> 
> }
>
> I am noticing the outOfOrderEvents are reported until new events are read
> in to the stream since the last restart.
>
> Regards,
> Nara
>
>


Re: TimerService/Watermarks and Checkpoints

2018-05-30 Thread Fabian Hueske
Hi Nara and Sihua,

That's indeed an unexpected behavior and it would be good to identify the
reason for the late data.

As Sihua said, watermarks are currently not checkpointed and reset to
Long.MIN_VALUE upon restart.
AFAIK, the main reason why WMs are not checkpointed is that the special
type of operator state that is required for this (union-list state) wasn't
available when the mechanism was implemented.
I think there are plans to address this shortcoming (see FLINK-5601 [1]).

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-5601

2018-05-30 19:00 GMT+02:00 Narayanan Arunachalam <
narayanan.arunacha...@gmail.com>:

> Thanks Sihua. If it's reset to Long.MIN_VALUE I can't explain why
> outOfOrderEvents are reported. Because the event time on the data will
> always be greater than Long.MIN_VALUE.
>
> Following are the steps to reproduce this scenario.
> - A source to produce events with timestamps that is increasing for every
> event produced
> - Use TimeCharacteristic.EventTime
> - Use BoundedOutOfOrdernessTimestampExtractor with maxOutOfOrderness set
> to 60s.
> - Enable checkpoints
> - ProcessFunction impl to report a counter to some metrics backend when
> the timestamp of the event is less than currentWatermark
> - No out of order events will be reported initially. After few checkpoints
> are created, cancel and restart the job from a previous checkpoint.
>
> *Note*: The event stream really doesn't have out of order data. Job
> restart from a checkpoint causes this artificial out of order events
> because of the watermark value.
>
> Regards,
> Nara
>
>
>
>
> On Tue, May 29, 2018 at 7:54 PM, sihua zhou  wrote:
>
>> Hi Nara,
>>
>> yes, the watermark in TimerService is not covered by the checkpoint,
>> everytime the job is restarted from a previous checkpoint, it is reset to
>> Long.MIN_VALUE. I can see it a bit tricky to cover it into the checkpoint,
>> especially when we need to support rescaling(it seems not like a purely
>> keyed or a operate state), maybe @Stefan or @Aljoscha could give you more
>> useful information about why it wasn't covered by the checkpoint.
>>
>> Best, Sihua
>>
>>
>>
>> On 05/30/2018 05:44,Narayanan Arunachalam> l...@gmail.com>  wrote:
>>
>> Hi,
>>
>> Is it possible the watermark in TimerService not getting reset when a job
>> is restarted from a previous checkpoint? I would expect the watermark in a
>> TimerService also to go back in time.
>>
>> I have the following ProcessFunction implementation.
>>
>>   override def processElement(
>> e: TraceEvent,
>> ctx: ProcessFunction[
>>   TraceEvent,
>>   Trace
>> ]#Context,
>> out: Collector[Trace]
>>   ): Unit = {
>>
>> if (e.getAnnotationTime() < ctx.timerService().currentWatermark()) {
>>   registry.counter("tracing.outOfOrderEvents").increment()
>> } else {
>> 
>> }
>>
>> I am noticing the outOfOrderEvents are reported until new events are read
>> in to the stream since the last restart.
>>
>> Regards,
>> Nara
>>
>>
>


Multiple Task Slots support in Flink 1.5

2018-05-30 Thread Abdul Qadeer
Hi!

I came across the following point in release notes

of 1.5 version:

"The allocation of TaskManagers with multiple slots is not fully supported
yet."

Does this mean the support for it will come as a patch for 1.5? or will it
be in the next stable release?
If I use legacy mode, will that support multiple slots per TaskManager?, or
is it only the deployment change that will get affected?


Re: env.execute() ?

2018-05-30 Thread Shuyi Chen
I think you might be looking for the functionality provided by the
clusterclient [1]. But I am not sure if I fully understand the meaning of
"do internally in sync with application". Maybe you can give a concrete use
case, so we can help better, if the ClusterClient is not what you want.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/client/program/ClusterClient.html

On Wed, May 30, 2018 at 3:18 AM, Esa Heikkinen  wrote:

> Hi
>
>
>
> Ok. Thanks for the clarification. But the controlling of savepoints is
> only possible by command line (or a script) ? Or is it possible to do
> internally in sync with application ?
>
>
>
> Esa
>
>
>
> *From:* Shuyi Chen 
> *Sent:* Wednesday, May 30, 2018 8:18 AM
> *To:* Esa Heikkinen 
> *Cc:* Fabian Hueske ; user@flink.apache.org
> *Subject:* Re: env.execute() ?
>
>
>
> Hi Esa,
>
>
>
> I think having more than one env.execute() is anti-pattern in Flink.
>
>
>
> env.execute() behaves differently depending on the env. For local, it will
> generate the flink job graph, and start a local mini cluster in background
> to run the job graph directly.
> For remote case, it will generate the flink job graph and submit it to a
> remote cluster, e.g. running on YARN/Mesos, the local process might stay
> attached or detach to the job on the remote cluster given options. So it's
> not a simple "unstoppable forever loop", and I dont think the "stop
> env.execute() and then do something and after that restart it" will work in
> general.
>
>
>
> But I think you can take a look at savepoints [1] and checkpoints [2] in
> Flink. With savepoints, you can stop the running job, and do something
> else, and restart from the savepoints to resume the processing.
>
>
>
>
>
> [1]  https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/ops/state/savepoints.html
>
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/
> checkpoints.html
>
>
>
> Thanks
>
> Shuyi
>
>
>
> On Tue, May 29, 2018 at 3:56 AM, Esa Heikkinen <
> esa.heikki...@student.tut.fi> wrote:
>
> Hi
>
>
>
> Are there only one env.execute() in application ?
>
>
>
> Is it unstoppable forever loop ?
>
>
>
> Or can I stop env.execute() and then do something and after that restart
> it ?
>
>
>
> Best, Esa
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Tuesday, May 29, 2018 1:35 PM
> *To:* Esa Heikkinen 
> *Cc:* user@flink.apache.org
> *Subject:* Re: env.execute() ?
>
>
>
> Hi,
>
>
>
> It is mandatory for all DataStream programs and most DataSet programs.
>
>
>
> Exceptions are ExecutionEnvironment.print() and
> ExecutionEnvironment.collect().
>
> Both methods are defined on the DataSet ExecutionEnvironment and call
> execute() internally.
>
>
>
> Best, Fabian
>
>
>
> 2018-05-29 12:31 GMT+02:00 Esa Heikkinen :
>
> Hi
>
>
>
> Is it env.execute() mandatory at the end of application ? It is possible
> to run the application without it ?
>
>
>
> I found some examples where it is missing.
>
>
>
> Best, Esa
>
>
>
>
>
>
>
> --
>
> "So you have to trust that the dots will somehow connect in your future."
>



-- 
"So you have to trust that the dots will somehow connect in your future."


Re: TimerService/Watermarks and Checkpoints

2018-05-30 Thread Narayanan Arunachalam
Thanks for the explanation. I looked at this metric closely and noticed
there are some events arriving in out of order. The hypothesis I have is,
when the job is restarted, all of the small out of order chunks add up and
show a significant number. The graph below shows the number of out of order
events every min. The job was started with new state at 11:53 am and then
restarted with the previous checkpoint at 1:24 pm.

That said, after restart the out of order events number is very high though
:thinking_face:





On Wed, May 30, 2018 at 1:55 PM, Fabian Hueske  wrote:

> Hi Nara and Sihua,
>
> That's indeed an unexpected behavior and it would be good to identify the
> reason for the late data.
>
> As Sihua said, watermarks are currently not checkpointed and reset to
> Long.MIN_VALUE upon restart.
> AFAIK, the main reason why WMs are not checkpointed is that the special
> type of operator state that is required for this (union-list state) wasn't
> available when the mechanism was implemented.
> I think there are plans to address this shortcoming (see FLINK-5601 [1]).
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-5601
>
> 2018-05-30 19:00 GMT+02:00 Narayanan Arunachalam <
> narayanan.arunacha...@gmail.com>:
>
>> Thanks Sihua. If it's reset to Long.MIN_VALUE I can't explain why
>> outOfOrderEvents are reported. Because the event time on the data will
>> always be greater than Long.MIN_VALUE.
>>
>> Following are the steps to reproduce this scenario.
>> - A source to produce events with timestamps that is increasing for every
>> event produced
>> - Use TimeCharacteristic.EventTime
>> - Use BoundedOutOfOrdernessTimestampExtractor with maxOutOfOrderness set
>> to 60s.
>> - Enable checkpoints
>> - ProcessFunction impl to report a counter to some metrics backend when
>> the timestamp of the event is less than currentWatermark
>> - No out of order events will be reported initially. After few
>> checkpoints are created, cancel and restart the job from a previous
>> checkpoint.
>>
>> *Note*: The event stream really doesn't have out of order data. Job
>> restart from a checkpoint causes this artificial out of order events
>> because of the watermark value.
>>
>> Regards,
>> Nara
>>
>>
>>
>>
>> On Tue, May 29, 2018 at 7:54 PM, sihua zhou  wrote:
>>
>>> Hi Nara,
>>>
>>> yes, the watermark in TimerService is not covered by the checkpoint,
>>> everytime the job is restarted from a previous checkpoint, it is reset to
>>> Long.MIN_VALUE. I can see it a bit tricky to cover it into the checkpoint,
>>> especially when we need to support rescaling(it seems not like a purely
>>> keyed or a operate state), maybe @Stefan or @Aljoscha could give you more
>>> useful information about why it wasn't covered by the checkpoint.
>>>
>>> Best, Sihua
>>>
>>>
>>>
>>> On 05/30/2018 05:44,Narayanan Arunachalam>> l...@gmail.com>  wrote:
>>>
>>> Hi,
>>>
>>> Is it possible the watermark in TimerService not getting reset when a
>>> job is restarted from a previous checkpoint? I would expect the watermark
>>> in a TimerService also to go back in time.
>>>
>>> I have the following ProcessFunction implementation.
>>>
>>>   override def processElement(
>>> e: TraceEvent,
>>> ctx: ProcessFunction[
>>>   TraceEvent,
>>>   Trace
>>> ]#Context,
>>> out: Collector[Trace]
>>>   ): Unit = {
>>>
>>> if (e.getAnnotationTime() < ctx.timerService().currentWatermark()) {
>>>   registry.counter("tracing.outOfOrderEvents").increment()
>>> } else {
>>> 
>>> }
>>>
>>> I am noticing the outOfOrderEvents are reported until new events are
>>> read in to the stream since the last restart.
>>>
>>> Regards,
>>> Nara
>>>
>>>
>>
>


JVM metrics disappearing after job crash, restart

2018-05-30 Thread Nikolas Davis
Howdy,

We are seeing our task manager JVM metrics disappear over time. This last
time we correlated it to our job crashing and restarting. I wasn't able to
grab the failing exception to share. Any thoughts?

We track metrics through the MetricReporter interface. As far as I can tell
this more or less only affects the JVM metrics. I.e. most / all other
metrics continue reporting fine as the job is automatically restarted.

Nik Davis
Software Engineer
New Relic


Re: JVM metrics disappearing after job crash, restart

2018-05-30 Thread Ajay Tripathy
How are your metrics dimensionalized/named? Task managers often have UIDs
generated for them. The task id dimension will change on restart. If you
name your metric based on this 'task_id' there would be a discontinuity
with the old metric.

On Wed, May 30, 2018 at 4:49 PM, Nikolas Davis  wrote:

> Howdy,
>
> We are seeing our task manager JVM metrics disappear over time. This last
> time we correlated it to our job crashing and restarting. I wasn't able to
> grab the failing exception to share. Any thoughts?
>
> We track metrics through the MetricReporter interface. As far as I can
> tell this more or less only affects the JVM metrics. I.e. most / all other
> metrics continue reporting fine as the job is automatically restarted.
>
> Nik Davis
> Software Engineer
> New Relic
>


Re: Flink and AWS S3 integration: java.lang.NullPointerException: null uri host

2018-05-30 Thread Bowen Li
Did you run Flink on AWS EMR or somewhere else? Have you read and followed
instructions on
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#amazon-web-services-aws
?



On Wed, May 30, 2018 at 7:08 AM, Fabian Wollert  wrote:

> Hi, I'm trying to set up Checkpoints for Flink Jobs with S3 as a
> filesystem backend. I configured the following:
>
> state.backend=filesystem
> state.backend.fs.checkpointdir=s3:///mybucket/
> state.checkpoints.dir=s3:///mybucket/
> state.checkpoints.num-retained=3
>
> I also copied the flink-s3-fs-hadoop-1.5.0.jar into the lib folder.
>
> I get now though the following error message:
>
> Caused by: java.lang.NullPointerException: null uri host.
> at java.util.Objects.requireNonNull(Objects.java:228)
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.
> s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:65)
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.
> s3a.S3AFileSystem.initialize(S3AFileSystem.java:165)
> at org.apache.flink.fs.s3hadoop.S3FileSystemFactory.create(
> S3FileSystemFactory.java:133)
>
> I tried to dig deeper into the source code, but struggled to find
>
>- what is meant with this URI
>- where to configure it
>
> Can anybody give some advice how to set up the S3 Backend with the new
> shaded lib jar?
>
> Thanks in advance
> --
>
>
> *Fabian WollertZalando SE*
>
> E-Mail: fabian.woll...@zalando.de
>
> Tamara-Danz-Straße 1
> 
> 10243 Berlin
> 
> Fax: +49 (0)30 2759 46 93
> E-mail: legalnot...@zalando.co.uk
> Notifications of major holdings (Sec. 33, 38, 39 WpHG):  +49 (0)30
> 2000889349
>
> Management Board:
> Robert Gentz, David Schneider, Rubin Ritter
>
> Chairman of the Supervisory Board:
> Lothar Lanz
>
> Person responsible for providing the contents of Zalando SE acc. to Art.
> 55 RStV [Interstate Broadcasting Agreement]: Rubin Ritter
> Registered at the Local Court Charlottenburg Berlin, HRB 158855 B
> VAT registration number: DE 260543043
>


Re: JVM metrics disappearing after job crash, restart

2018-05-30 Thread Nikolas Davis
We keep track of metrics by using the value of
MetricGroup::getMetricIdentifier, which returns the fully qualified metric
name. The query that we use to monitor metrics filters for metrics IDs that
match '%Status.JVM.Memory%'. As long as the new metrics come online via the
MetricReporter interface then I think the chart would be continuous; we
would just see the old JVM memory metrics cycle into new metrics.

Nik Davis
Software Engineer
New Relic

On Wed, May 30, 2018 at 5:30 PM, Ajay Tripathy  wrote:

> How are your metrics dimensionalized/named? Task managers often have UIDs
> generated for them. The task id dimension will change on restart. If you
> name your metric based on this 'task_id' there would be a discontinuity
> with the old metric.
>
> On Wed, May 30, 2018 at 4:49 PM, Nikolas Davis 
> wrote:
>
>> Howdy,
>>
>> We are seeing our task manager JVM metrics disappear over time. This last
>> time we correlated it to our job crashing and restarting. I wasn't able to
>> grab the failing exception to share. Any thoughts?
>>
>> We track metrics through the MetricReporter interface. As far as I can
>> tell this more or less only affects the JVM metrics. I.e. most / all other
>> metrics continue reporting fine as the job is automatically restarted.
>>
>> Nik Davis
>> Software Engineer
>> New Relic
>>
>
>


Gluster as file system for state backend

2018-05-30 Thread Chirag Dewan
Hi,
I am evaluating some File Systems as state backend. I can see that Flink 
currently supports S3, MAPRFS and HDFS as file systems. 
However, I was hoping I can use Gluster as my state backend, since its already 
a part of existing eco system. Since I have stateful operators in my job and I 
am expecting each operator to accumulate large amount of state(in RocksDB) in 
10seconds(checkpointing interval), I am looking for a write optimized  file 
system.  

Does anyone has any prior experience with Gluster as file state backend? Are 
there any caveats with using a replicated/striped replicated volume for Flink 
state data? 
Thanks,
Chirag 
  

Re: NPE in flink sql over-window

2018-05-30 Thread Yan Zhou [FDS Science]
Thanks for the replay.


Yes, it only happen if I config the idle state retention times. The error 
occurs the first time before the first recovery. I haven't run with proctime 
but rowtime in flink 1.4.x. I am not sure if it will cause problems with 
proctime in 1.4.x.


I am adding some trace log for ProcTimeBoundedRangeOver. I will update with my 
test result and fire a JIRA after that.


Best

Yan


From: Fabian Hueske 
Sent: Wednesday, May 30, 2018 1:43:01 AM
To: Dawid Wysakowicz
Cc: user
Subject: Re: NPE in flink sql over-window

Hi,

Dawid's analysis is certainly correct, but looking at the code this should not 
happen.

I have a few questions:
- You said this only happens if you configure idle state retention times, right?
- Does the error occur the first time without a previous recovery?
- Did you run the same query on Flink 1.4.x without any problems?

Thanks, Fabian

2018-05-30 9:25 GMT+02:00 Dawid Wysakowicz 
mailto:dwysakow...@apache.org>>:

Hi Yan,


I think it is a bug in the ProcTimeBoundedRangeOver. It tries to access a list 
of elements that was already cleared and does not check against null. Could you 
please file a JIRA for that?


Best,

Dawid

On 30/05/18 08:27, Yan Zhou [FDS Science] wrote:

I also get warnning that CodeCache is full around that time. It's printed by 
JVM and doesn't have timestamp. But I suspect that it's because so many failure 
recoveries from checkpoint and the sql queries are dynamically compiled too 
many times.



Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler has been 
disabled.
Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache size 
using -XX:ReservedCodeCacheSize=
CodeCache: size=245760Kb used=244114Kb max_used=244146Kb free=1645Kb
bounds [0x7fa4fd00, 0x7fa50c00, 0x7fa50c00]
total_blobs=54308 nmethods=53551 adapters=617
compilation: disabled (not enough contiguous free space left)





From: Yan Zhou [FDS Science] 
Sent: Tuesday, May 29, 2018 10:52:18 PM
To: user@flink.apache.org
Subject: NPE in flink sql over-window


Hi,

I am using flink sql 1.5.0. My application throws NPE. And after it recover 
from checkpoint automatically, it throws NPE immediately from same line of code.


My application read message from kafka, convert the datastream into a table, 
issue an Over-window aggregation and write the result into a sink. NPE throws 
from class ProcTimeBoundedRangeOver. Please see exception log at the bottom.


The exceptions always happens after the application started for 
maxIdleStateRetentionTime time.  What could be the possible causes?


Best

Yan


2018-05-27 11:03:37,656 INFO  org.apache.flink.runtime.taskmanager.Task 
- over: (PARTITION BY: uid, ORDER BY: proctime, RANGEBETWEEN 
8640 PRECEDI
NG AND CURRENT ROW, select: (id, uid, proctime, group_concat($7) AS w0$o0)) -> 
select:
(id, uid, proctime, w0$o0 AS EXPR$3) -> to: Row -> Flat Map -> Filter -> Sink: 
Unnamed (3/15) (327
efe96243bbfdf1f1e40a3372f64aa) switched from RUNNING to FAILED.
TimerException{java.lang.NullPointerException}
   at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
   at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
   at 
org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRangeOverWithLog.onTimer(ProcTimeBoundedRangeOver.scala:181)
   at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
   at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onProcessingTime(LegacyKeyedProcessOperator.java:81)
   at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:266)
   at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)







Is Flink:1.5 Docker image broken?

2018-05-30 Thread Chirag Dewan
Hi,
flink:latest docker image doesn't seem to work. I am not able to access the 
Flink Dashboard after deploying it on Kubernetes.  
Anyone else facing the issue?
Thanks,
Chirag 

Re: Is Flink:1.5 Docker image broken?

2018-05-30 Thread Alexandru Gutan
Well those are unofficial, so you might raise a correspoinding issue on
github (since the images are there) for that.

On 31 May 2018 at 08:09, Chirag Dewan  wrote:

> Hi,
>
> flink:latest docker image doesn't seem to work. I am not able to access
> the Flink Dashboard after deploying it on Kubernetes.
>
> Anyone else facing the issue?
>
> Thanks,
>
> Chirag
>