[jira] [Created] (FLINK-8617) Fix code generation bug while accessing Map type

2018-02-08 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-8617:
-

 Summary: Fix code generation bug while accessing Map type
 Key: FLINK-8617
 URL: https://issues.apache.org/jira/browse/FLINK-8617
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


There's a code generation bug in {code}ScalarOperatos.generateMapGet{code}.
And there's two more bugs found in {code}ScalarOperators.generateIsNull{code} 
and {code}ScalarOperators.generateIsNotNull{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8616) Missing null check in OperatorChain#pushToOperator masks ClassCastException

2018-02-08 Thread Cliff Resnick (JIRA)
Cliff Resnick created FLINK-8616:


 Summary: Missing null check in OperatorChain#pushToOperator masks 
ClassCastException
 Key: FLINK-8616
 URL: https://issues.apache.org/jira/browse/FLINK-8616
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.4.0
Reporter: Cliff Resnick


There is an attempt to enrich the exception with outputTag#getId, but outputTag 
is null, and a NullPointerException is thrown. Looking at the attempted message 
enrichment the code seems to assume a ClassCastException can only stem from 
SideOutput type mismatches. This may have been the norm before, but changes to 
classloader delegation in 1.4 have given rise to multiple ClassLoader conflicts 
(at least for us), and they all seem to end up here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Timestamp/watermark support in Kinesis consumer

2018-02-08 Thread Thomas Weise
-->

On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai 
wrote:

> Regarding the two hooks you would like to be available:
>
>
>- Provide hook to override discovery (not to hit Kinesis from every
>subtask)
>
> Yes, I think we can easily provide a way, for example setting -1 for
> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
> Though, the user would then have to savepoint and restore in order to pick
> up new shards after a Kinesis stream reshard (which is in practice the best
> way to by-pass the Kinesis API rate limitations).
> +1 to provide that.
>

I'm considering a customization of KinesisDataFetcher with override for
discoverNewShardsToSubscribe. We still want shards to be discovered, just
not by hitting Kinesis from every subtask.


>
>
>- Provide hook to support custom watermark generation (somewhere
>around KinesisDataFetcher.emitRecordAndUpdateState)
>
> Per-partition watermark generation on the Kinesis side is slightly more
> complex than Kafka, due to how Kinesis’s dynamic resharding works.
> I think we need to additionally allow new shards to be consumed only after
> its parent shard is fully read, otherwise “per-shard time characteristics”
> can be broken because of this out-of-orderness consumption across the
> boundaries of a closed parent shard and its child.
> There theses JIRAs [1][2] which has a bit more details on the topic.
> Otherwise, in general I’m also +1 to providing this also in the Kinesis
> consumer.
>

Here I'm thinking to customize emitRecordAndUpdateState (method would need
to be made non-final). Using getSubscribedShardsState with additional
transient state to keep track of watermark per shard and emit watermark as
appropriate.

That's the idea - haven't written any code for it yet.

Thanks,
Thomas


[jira] [Created] (FLINK-8615) Configuring Apache Flink Local Set up with Pseudo distributed Yarn

2018-02-08 Thread Karrtik Iyer (JIRA)
Karrtik Iyer created FLINK-8615:
---

 Summary: Configuring Apache Flink Local Set up with Pseudo 
distributed Yarn
 Key: FLINK-8615
 URL: https://issues.apache.org/jira/browse/FLINK-8615
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.4.0
 Environment: Mac Os, Hadoop 2.8.3 and Flink 1.4.0
Reporter: Karrtik Iyer


I have set up HADOOP 2.8.3 and YARN on my Mac machine in Pseudo distributed 
mode, following this blog: [Hadoop In Pseudo Distributed 
Mode|http://zhongyaonan.com/hadoop-tutorial/setting-up-hadoop-2-6-on-mac-osx-yosemite.html]
 I have been able to successfully start hdfs and yarn. And also able to submit 
Map reduce jobs.

After that I have download Apache Flink 1.4 from 
[here|http://redrockdigimark.com/apachemirror/flink/flink-1.4.0/flink-1.4.0-bin-hadoop28-scala_2.11.tgz].
 Now I am trying to set up Flink on the above Yarn cluster by following the 
steps 
[here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/yarn_setup.html].
 When I try to run /bin/yarn-session.sh -n 4 -jm 1024 -tm 4096, I am getting 
below error which I am unable to resolve. Can someone please advise and help me 
with the same? 

 

{{Error while deploying YARN cluster: Couldn't deploy Yarn session cluster 
java.lang.RuntimeException: Couldn't deploy Yarn session cluster at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:372)
 at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679) 
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
 at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
 at java.security.AccessController.doPrivileged(Native Method) at 
javax.security.auth.Subject.doAs(Subject.java:422) at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807)
 at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)
 Caused by: org.apache.flink.configuration.IllegalConfigurationException: The 
number of virtual cores per node were configured with 1 but Yarn only has -1 
virtual cores available. Please note that the number of virtual cores is set to 
the number of task slots by default unless configured in the Flink config with 
'yarn.containers.vcores.' at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:265)
 at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:415)
 at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)}}

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8614) Enable Flip-6 per default

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8614:


 Summary: Enable Flip-6 per default
 Key: FLINK-8614
 URL: https://issues.apache.org/jira/browse/FLINK-8614
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


After adding the FLINK-8471, the next step is to enable Flip-6 per default by 
setting the configuration switch to {{flip6}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8613) Return excess container in YarnResourceManager

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8613:


 Summary: Return excess container in YarnResourceManager
 Key: FLINK-8613
 URL: https://issues.apache.org/jira/browse/FLINK-8613
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination, YARN
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


The {{YarnResourceManager}} should return excess containers which the Yarn 
RessourceManager assigned wrongly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8612) Add non-detached job mode

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8612:


 Summary: Add non-detached job mode
 Key: FLINK-8612
 URL: https://issues.apache.org/jira/browse/FLINK-8612
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


In order to support the non-detached job mode, the {{MiniDispatcher}} has to 
wait until it has served the {{JobResult}} of a completed job at least once 
before it terminates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8611) Add result future to JobManagerRunner

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8611:


 Summary: Add result future to JobManagerRunner
 Key: FLINK-8611
 URL: https://issues.apache.org/jira/browse/FLINK-8611
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


Adding a {{CompletableFuture}} result future to the 
{{JobManagerRunner}} will allow to return a {{JobResult}} future for an still 
running job. This is helpful for the implementation of a non-detached job mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8610) Remove RestfulGateway from JobMasterGateway

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8610:


 Summary: Remove RestfulGateway from JobMasterGateway
 Key: FLINK-8610
 URL: https://issues.apache.org/jira/browse/FLINK-8610
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


After adding FLINK-8608, the {{JobMaster}} no longer needs to implement the 
{{RestfulGateway}}. Therefore, we should remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8609) Add support to deploy detached job mode clusters

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8609:


 Summary: Add support to deploy detached job mode clusters
 Key: FLINK-8609
 URL: https://issues.apache.org/jira/browse/FLINK-8609
 Project: Flink
  Issue Type: New Feature
  Components: Client
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


After adding FLINK-8608, we can add support to the {{CliFrontend}} to deploy 
detached job mode clusters.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8608) Add MiniDispatcher for job mode

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8608:


 Summary: Add MiniDispatcher for job mode
 Key: FLINK-8608
 URL: https://issues.apache.org/jira/browse/FLINK-8608
 Project: Flink
  Issue Type: New Feature
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


In order to properly support the job mode, we need a {{MiniDispatcher}} which 
is started with a pre initialized {{JobGraph}} and launches a single 
{{JobManagerRunner}} with this job. Once the job is completed and if the 
{{MiniDispatcher}} is running in detached mode, the {{MiniDispatcher}} should 
terminate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8607) Add a basic embedded SQL CLI client

2018-02-08 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8607:
---

 Summary: Add a basic embedded SQL CLI client
 Key: FLINK-8607
 URL: https://issues.apache.org/jira/browse/FLINK-8607
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Timo Walther
Assignee: Timo Walther


This issue describes the Implementation Plan 1 of FLIP-24.

Goal: Add the basic features to play around with Flink's streaming SQL.

{code}
- Add CLI component that reads the configuration files
- "Pre-registered table sources"
- "Job parameters"
- Add executor for retrieving pre-flight information and corresponding CLI SQL 
parser
- SHOW TABLES
- DESCRIBE TABLE
- EXPLAIN
- Add streaming append query submission to executor
- Submit jars and run SELECT query using the ClusterClient
- Collect results on heap and serve them on the CLI side (Internal Mode with 
SELECT)
- SOURCE (for executing a SQL statement stored in a local file)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8606) Clean up Table API for Java users

2018-02-08 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8606:
---

 Summary: Clean up Table API for Java users
 Key: FLINK-8606
 URL: https://issues.apache.org/jira/browse/FLINK-8606
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Timo Walther
Assignee: Timo Walther


Java users are able to see many variables and methods that are declared 
{{private[flink]}} or even {{protected}} in Scala. Classes such as 
{{TableEnvironment}} look very messy from the outside in Java. We should clean 
up the API and remove {{private[flink]}} or {{protected}} where ever possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8605) Enable job cancellation from the web UI

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8605:


 Summary: Enable job cancellation from the web UI
 Key: FLINK-8605
 URL: https://issues.apache.org/jira/browse/FLINK-8605
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


In order to enable the job cancellation from the web UI (including YARN) we 
have to register the {{JobTerminationHandler}} under 
{{/jobs/:jobId/yarn-cancel}} and {{/jobs/:jobid/yarn-stop}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8604) Move JobTerminationHandler to WebMonitorEndpoint

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8604:


 Summary: Move JobTerminationHandler to WebMonitorEndpoint
 Key: FLINK-8604
 URL: https://issues.apache.org/jira/browse/FLINK-8604
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


The {{JobTerminationHandler}} is currently only registered in the 
{{DispatcherRestEndpoint}}. However, it should also be possible to terminate 
jobs from the web UI. In order to support this, the {{JobTerminationHandler}} 
has to be registered at the {{WebMonitorEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8603) Split RestClusterClient#submitJob into submitJob and requestJobResult

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8603:


 Summary: Split RestClusterClient#submitJob into submitJob and 
requestJobResult
 Key: FLINK-8603
 URL: https://issues.apache.org/jira/browse/FLINK-8603
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.5.0
Reporter: Till Rohrmann
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Re: Confused about method naming in heartbeatFromJobManager or heartbeatFromResourceManager

2018-02-08 Thread Till Rohrmann
Yes this is how it works. The TaskExecutor gets polled for its heartbeat
response if you wish. Of course, if it does not get polled often enough,
then the JobMaster or RM can also time out.

On Thu, Feb 8, 2018 at 1:22 PM, mingleizhang  wrote:

> Thanks Till. But sorry, what what!  The TaskExecutor is only responding to
> heartbeat requests but not actively sending them out ? ? ? I think
> taskexecutor should report it's status to jobmanager and resourcemanager by
> heartbeat. And TaskExecutor is like a slave and jobmanager and
> resourcemanager are act as master That is what I think now. I am wrong
> ? Please! Strange
>
> Thanks
> Rice.
>
>
>
>
>
> At 2018-02-08 20:16:05, "Till Rohrmann"  wrote:
> >The RPC methods `heartbeatFromXYZ` are the incoming heartbeats from the
> >JobMaster and ResourceManager, respectively. That's why they are called
> >heartbeatFrom. The TaskExecutor is only responding to heartbeat requests
> >but not actively sending them out.
> >
> >Cheers,
> >Till
> >
> >On Thu, Feb 8, 2018 at 1:06 PM, mingleizhang  wrote:
> >
> >> Attached is the taskmanager design picture. And from the picturen, I know
> >> taskmanager send heartbeat to jobmanager ( also called jobmaster in flip6
> >> now) and resourcesmanager periodically. But when I watched the source code
> >> below in TaskExecutor.java. I feel confused about the function name.
> >> Shouldn't it be *heartbeatToJobManager *and *heartbeatToResouceManager *?
> >> And in TaskExecutorGateway.java there is a comment, Heartbeat request from
> >> job manager. yea. both them confused me a lot. Does anyone let me know why
> >> called it like that ?
> >>
> >> Thanks
> >> Rice.
> >>
> >> // --
> >> // Heartbeat RPC
> >> // --
> >>
> >> @Override
> >> public void heartbeatFromJobManager(ResourceID resourceID) {
> >>jobManagerHeartbeatManager.requestHeartbeat(resourceID, null);
> >> }
> >>
> >> @Override
> >> public void heartbeatFromResourceManager(ResourceID resourceID) {
> >>resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
> >> }
> >>
> >> /**
> >>  * Heartbeat request from the job manager.
> >>  *
> >>  * @param heartbeatOrigin unique id of the job manager
> >>  */
> >> void heartbeatFromJobManager(ResourceID heartbeatOrigin);
> >>
> >> /**
> >>  * Heartbeat request from the resource manager.
> >>  *
> >>  * @param heartbeatOrigin unique id of the resource manager
> >>  */
> >> void heartbeatFromResourceManager(ResourceID heartbeatOrigin);
> >>
> >>
> >>
> >>
> >>
>
>
>
>
>


Re:Re: Confused about method naming in heartbeatFromJobManager or heartbeatFromResourceManager

2018-02-08 Thread mingleizhang
Thanks Till. But sorry, what what!  The TaskExecutor is only responding to 
heartbeat requests but not actively sending them out ? ? ? I think taskexecutor 
should report it's status to jobmanager and resourcemanager by heartbeat. And 
TaskExecutor is like a slave and jobmanager and resourcemanager are act as 
master That is what I think now. I am wrong ? Please! Strange


Thanks
Rice.







At 2018-02-08 20:16:05, "Till Rohrmann"  wrote:
>The RPC methods `heartbeatFromXYZ` are the incoming heartbeats from the
>JobMaster and ResourceManager, respectively. That's why they are called
>heartbeatFrom. The TaskExecutor is only responding to heartbeat requests
>but not actively sending them out.
>
>Cheers,
>Till
>
>On Thu, Feb 8, 2018 at 1:06 PM, mingleizhang  wrote:
>
>> Attached is the taskmanager design picture. And from the picturen, I know
>> taskmanager send heartbeat to jobmanager ( also called jobmaster in flip6
>> now) and resourcesmanager periodically. But when I watched the source code
>> below in TaskExecutor.java. I feel confused about the function name.
>> Shouldn't it be *heartbeatToJobManager *and *heartbeatToResouceManager *?
>> And in TaskExecutorGateway.java there is a comment, Heartbeat request from
>> job manager. yea. both them confused me a lot. Does anyone let me know why
>> called it like that ?
>>
>> Thanks
>> Rice.
>>
>> // --
>> // Heartbeat RPC
>> // --
>>
>> @Override
>> public void heartbeatFromJobManager(ResourceID resourceID) {
>>jobManagerHeartbeatManager.requestHeartbeat(resourceID, null);
>> }
>>
>> @Override
>> public void heartbeatFromResourceManager(ResourceID resourceID) {
>>resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
>> }
>>
>> /**
>>  * Heartbeat request from the job manager.
>>  *
>>  * @param heartbeatOrigin unique id of the job manager
>>  */
>> void heartbeatFromJobManager(ResourceID heartbeatOrigin);
>>
>> /**
>>  * Heartbeat request from the resource manager.
>>  *
>>  * @param heartbeatOrigin unique id of the resource manager
>>  */
>> void heartbeatFromResourceManager(ResourceID heartbeatOrigin);
>>
>>
>>
>>
>>


Re: Confused about method naming in heartbeatFromJobManager or heartbeatFromResourceManager

2018-02-08 Thread Till Rohrmann
The RPC methods `heartbeatFromXYZ` are the incoming heartbeats from the
JobMaster and ResourceManager, respectively. That's why they are called
heartbeatFrom. The TaskExecutor is only responding to heartbeat requests
but not actively sending them out.

Cheers,
Till

On Thu, Feb 8, 2018 at 1:06 PM, mingleizhang  wrote:

> Attached is the taskmanager design picture. And from the picturen, I know
> taskmanager send heartbeat to jobmanager ( also called jobmaster in flip6
> now) and resourcesmanager periodically. But when I watched the source code
> below in TaskExecutor.java. I feel confused about the function name.
> Shouldn't it be *heartbeatToJobManager *and *heartbeatToResouceManager *?
> And in TaskExecutorGateway.java there is a comment, Heartbeat request from
> job manager. yea. both them confused me a lot. Does anyone let me know why
> called it like that ?
>
> Thanks
> Rice.
>
> // --
> // Heartbeat RPC
> // --
>
> @Override
> public void heartbeatFromJobManager(ResourceID resourceID) {
>jobManagerHeartbeatManager.requestHeartbeat(resourceID, null);
> }
>
> @Override
> public void heartbeatFromResourceManager(ResourceID resourceID) {
>resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
> }
>
> /**
>  * Heartbeat request from the job manager.
>  *
>  * @param heartbeatOrigin unique id of the job manager
>  */
> void heartbeatFromJobManager(ResourceID heartbeatOrigin);
>
> /**
>  * Heartbeat request from the resource manager.
>  *
>  * @param heartbeatOrigin unique id of the resource manager
>  */
> void heartbeatFromResourceManager(ResourceID heartbeatOrigin);
>
>
>
>
>


Confused about method naming in heartbeatFromJobManager or heartbeatFromResourceManager

2018-02-08 Thread mingleizhang
Attached is the taskmanager design picture. And from the picturen, I know 
taskmanager send heartbeat to jobmanager ( also called jobmaster in flip6 now) 
and resourcesmanager periodically. But when I watched the source code below in 
TaskExecutor.java. I feel confused about the function name. Shouldn't it be 
heartbeatToJobManager and heartbeatToResouceManager ? And in 
TaskExecutorGateway.java there is a comment, Heartbeat request from job 
manager. yea. both them confused me a lot. Does anyone let me know why called 
it like that ? 


Thanks
Rice.


// --
// Heartbeat RPC
// --

@Override
public void heartbeatFromJobManager(ResourceID resourceID) {
jobManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}

@Override
public void heartbeatFromResourceManager(ResourceID resourceID) {
resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}
/**
 * Heartbeat request from the job manager.
 *
 * @param heartbeatOrigin unique id of the job manager
 */
void heartbeatFromJobManager(ResourceID heartbeatOrigin);

/**
 * Heartbeat request from the resource manager.
 *
 * @param heartbeatOrigin unique id of the resource manager
 */
void heartbeatFromResourceManager(ResourceID heartbeatOrigin);

Re: Terminating streaming test

2018-02-08 Thread Aljoscha Krettek
Hi,

The job is not explicitly stopped, bringing down the cluster will also bring 
down the job. (Which is maybe not the nicest way of doing things but it works.)

Sources can trigger pipeline termination by returning from their run() method.

Best,
Aljoscha

> On 7. Feb 2018, at 21:15, Thomas Weise  wrote:
> 
> Thanks! It would indeed be nice to have this as framework that makes test
> fun and easy to write ;-)
> 
> Looking at SavepointMigrationTestBase, I see that the cluster is eventually
> stopped in teardown, but I don't find where the individual job is
> terminated after the expected results are in? Also, CheckingRestoringSource
> will run until cancelled, is there a way where the source can trigger
> pipeline termination?
> 
> Thanks,
> Thomas
> 
> 
> On Wed, Feb 7, 2018 at 7:56 AM, Aljoscha Krettek 
> wrote:
> 
>> There is StatefulJobSavepointMigrationITCase, which executes a proper
>> unbounded pipeline on a locally started cluster and "listens" for some
>> criteria via accumulators before cancelling the job and shutting down the
>> cluster. The communication with the cluster is quite custom here, but I
>> would really like to have a framework that comes with Flink that allows
>> writing such tests. Somewhat similar to how PAssert works in Beam.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 7. Feb 2018, at 04:34, Thomas Weise  wrote:
>>> 
>>> Hi Ken,
>>> 
>>> Thanks! I would expect more folks to run into this and hence surprised to
>>> not find this in LocalStreamEnvironment. Is there a reason for that?
>>> 
>>> In the specific case, we have an unbounded source (Kinesis), but for
>>> testing we would like to make it bounded. Hence the earlier question
>>> whether it is possible to terminate a topology with a final watermark or
>>> different means from within the source, similar to how a bounded source
>> in
>>> Beam would behave.
>>> 
>>> Thanks,
>>> Thomas
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Feb 6, 2018 at 5:16 PM, Ken Krugler >> 
>>> wrote:
>>> 
 Hi Thomas,
 
 Normally the streaming job will terminate when the sources are exhausted
 and all records have been processed.
 
 I assume you have some unbounded source(s), thus this doesn’t work for
 your case.
 
 We’d run into a similar situation with a streaming job that has
>> iterations.
 
 Our solution was your option #1 below, where we created a modified
>> version
 of LocalStreamEnvironment  that supports async execution.
 
 — Ken
 
 
> On Feb 6, 2018, at 4:21 PM, Thomas Weise  wrote:
> 
> Hi,
> 
> I'm looking for an example of an integration test that runs a streaming
 job
> and terminates when the expected result becomes available. I could
>> think
 of
> 2 approaches:
> 
> 1. Modified version of LocalStreamEnvironment that executes the job
> asynchronously and polls for the result or
> 
> 2. Source that emits a final watermark that causes the topology to
> terminate after the watermark has traversed the topology. Is that
 possible
> with Flink?
> 
> But probably this is a rather common testing need that's already
>> solved?!
> 
> Thanks,
> Thomas
 
 --
 Ken Krugler
 http://www.scaleunlimited.com
 custom big data solutions & training
 Hadoop, Cascading, Cassandra & Solr
 
 
>> 
>> 



Re: Timestamp/watermark support in Kinesis consumer

2018-02-08 Thread Tzu-Li (Gordon) Tai
Regarding the two hooks you would like to be available:

Provide hook to override discovery (not to hit Kinesis from every subtask)
Yes, I think we can easily provide a way, for example setting -1 for 
SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
Though, the user would then have to savepoint and restore in order to pick up 
new shards after a Kinesis stream reshard (which is in practice the best way to 
by-pass the Kinesis API rate limitations).
+1 to provide that.

Provide hook to support custom watermark generation (somewhere around 
KinesisDataFetcher.emitRecordAndUpdateState)
Per-partition watermark generation on the Kinesis side is slightly more complex 
than Kafka, due to how Kinesis’s dynamic resharding works.
I think we need to additionally allow new shards to be consumed only after its 
parent shard is fully read, otherwise “per-shard time characteristics” can be 
broken because of this out-of-orderness consumption across the boundaries of a 
closed parent shard and its child.
There theses JIRAs [1][2] which has a bit more details on the topic.
Otherwise, in general I’m also +1 to providing this also in the Kinesis 
consumer.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-5697
[2] https://issues.apache.org/jira/browse/FLINK-6349

On 8 February 2018 at 1:48:23 AM, Thomas Weise (t...@apache.org) wrote:

Provide hook to override discovery (not to hit Kinesis from every subtask)
Provide hook to support custom watermark generation (somewhere around 
KinesisDataFetcher.emitRecordAndUpdateState)

Re: Timestamp/watermark support in Kinesis consumer

2018-02-08 Thread Tzu-Li (Gordon) Tai
Hi Thomas,

It’s great that you’ve brought out these issues, which IMO are all very valid. 
They have also been in my head for a while.

Here’s a list of things, out of the top of my head, that I would really like to 
improve as part of a major Kafka / Kinesis connector rework.
Some have JIRAs for them already, or were discussed in some other indirectly 
related JIRA. It might make sense to open an umbrella ticket and consolidate 
all of them there.

- Common abstraction for partition-based, replayable sources, which handles 1) 
position checkpointing, 2) partition discovery / topic subscription (using the 
file source pattern), 3) custom startup positions, 4) per-partition watermarks, 
and 5) partition idleness.
- Configuration for the connectors are not well-defined. Some go through 
provided properties, some requires using setter methods, etc. Moreover, it is 
actually very confusing for some users that we share the Properties to carry 
Flink connector-specific configurations, as well as the internally used client 
configuration [1]. I think in this aspect, Beam’s KafkaIO has a nice API [2] 
when it comes to this.
- Some default behaviors of the current connectors, such as partitioning and 
flushing on the producer sides, and offset-committing for the Kafka consumer, 
do not make sense [3] [4].
- The deserialization / serialization schema together with the partitioner 
interfaces don’t really place well together. For example, the `getTargetTopic` 
method should really be part of the partitioner [5].

I think we are now in a good position to try making this happen for 1.6. Once 
1.5 is out of the way, I can try opening an umbrella JIRA and collect 
everything there so we can discuss more there.

Cheers,
Gordon

[1] 
https://issues.apache.org/jira/browse/FLINK-4280?focusedCommentId=15399648=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15399648
[2] 
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L171
[3] https://issues.apache.org/jira/browse/FLINK-5728
[4] https://issues.apache.org/jira/browse/FLINK-5704
[5] https://issues.apache.org/jira/browse/FLINK-6288

On 8 February 2018 at 1:48:23 AM, Thomas Weise (t...@apache.org) wrote:

Generalizing the pattern would be great. I was also wondering if there aren't 
other commonalities between sources that would benefit from a shared framework. 
Kafka and Kinesis don't look all that different from a consumer perspective: 
replayable source, topic -> stream, partition -> shard, offset -> sequence, 
dynamic discovery, state saving - shouldn't there be more common code?

Meanwhile, we need to find a way to address shortcomings in the current Kinesis 
connector to enable the use case. I would prefer to do that without permanently 
forking the connector code, so here are some more thoughts:
Provide hook to override discovery (not to hit Kinesis from every subtask)
Provide hook to support custom watermark generation (somewhere around 
KinesisDataFetcher.emitRecordAndUpdateState)
If we can accomplish these in short order, it would be great. The current 
implementation makes it hard/impossible to override certain behaviors (final 
protected methods and the like). If there is agreement then I would like to 
address those as a quick PR.

Thanks,
Thomas


On Wed, Feb 7, 2018 at 7:59 AM, Aljoscha Krettek  wrote:
Hi,

That last point is very valid. For a while now I've wanted to generalise the 
pattern of our file source to other sources. (This is related to how Beam 
sources are being refactored to use Splittable DoFn.)

I'm very eager for design work to start on this once 1.5 is out the door. There 
are some other folks (cc'ed) who have also talked/thought about this before.

Best,
Aljoscha

> On 7. Feb 2018, at 01:44, Thomas Weise  wrote:
>
> In addition to lack of watermark support, the Kinesis consumer suffers from
> a discovery related issue that also needs to be resolved. Shard discovery
> runs periodically in all subtasks. That's not just inefficient but becomes
> a real problem when there is a large number of subtasks due to rate
> limiting (
> https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html).
> The discovery interval should be minimized to cap latency (new shards not
> consumed until discovered).
>
> How about moving discovery out of the fetcher into a separate singleton
> source and then broadcast the result to the parallel fetchers, following
> the pattern applied to file input?
>
> https://github.com/apache/flink/blob/5f523e6ab31afeab5b1d9bbf62c6d4ef726ffe1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1336
>
> This would also ensure that all subtasks consistently see the same shard
> list.
>
> Thoughts?
>
> Thanks,
> Thomas
>
>
> On Mon, Feb 5, 2018 at 5:31 PM, Thomas Weise  wrote:
>
>> Hi,
>>
>> The Kinesis consumer 

Re: [DISCUSS] Releasing Flink 1.5.0

2018-02-08 Thread Till Rohrmann
Local state recovery is almost completely done. Only some reviews and
merging of the final PRs is pending.

The network stack improvements are on a good way to be finished by the end
of this week or beginning of next week. To my knowledge we got recently
green Travis builds :-) The network stack changes will also include the
application level flow control and the back pressure based checkpoint
alignment. So only the last reviews and merging is missing.

Concerning Flip-6, I'm currently working on enabling Flip-6 by default.
There are still some smaller things left to be done but I'm confident that
we can resolve them quickly.

I agree that due to the big changes we should have a very thorough and
principled testing period where we put Flink through the paces.

Cheers,
Till

On Wed, Feb 7, 2018 at 10:55 AM, Chesnay Schepler 
wrote:

> As Aljoscha said we wanted to do 1.5 soon after 1.4 based on the
> assumption that the 3 big features (FLIP-6, network stack changes, local
> state recovery) are nearly done.
>
> I'm unsure about local state recovery, but I still see open issues for
> FLIP-6 and the network stack rework.
> As such it doesn't make sense to release 1.5 now.
>
> Given the large scope of these features I would very much prefer to have
> them active on master for a while before a feature-freeze
> to expose them to a wider audience.
>
> IMO it will take at least another month before we can start the release
> process for 1.5, i.e. the feature freeze.
> (2 more weeks for implementation, 2 weeks on master for the dust to settle)
>
>
> On 05.02.2018 22:39, Kostas Kloudas wrote:
>
>> Hi Aljoscha,
>>
>> I believe that support for Broadcast State should also be in 1.5.
>> There is an open PR https://github.com/apache/flink/pull/5230 <
>> https://github.com/apache/flink/pull/5230> for that
>> and there are some pending issues related to scala api and documentation.
>>
>> Thanks,
>> Kostas
>>
>> On Feb 5, 2018, at 5:37 PM, Timo Walther  wrote:
>>>
>>> Hi Shuyi,
>>>
>>> I will take a look at it again this week. I'm pretty sure it will be
>>> part of 1.5.0.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 2/5/18 um 5:25 PM schrieb Shuyi Chen:
>>>
 Hi Aljoscha, can we get this feature in for 1.5.0? We have a lot of
 internal users waiting for this feature.

 [FLINK-7923 ] Support
 accessing subfields of a Composite element in an Object Array type
 column

 Thanks a lot
 Shuyi


 On Mon, Feb 5, 2018 at 6:59 AM, Christophe Jolif 
 wrote:

 Hi guys,
>
> Sorry for jumping in, but I think
>
> [FLINK-8101] Elasticsearch 6.X support
> [FLINK-7386]  Flink Elasticsearch 5 connector is not compatible with
> Elasticsearch 5.2+ client
>
>   have long been awaited and there was one PR from me and from someone
> else
> showing the interest ;) So if you could consider it for 1.5 that would
> be
> great!
>
> Thanks!
> --
> Christophe
>
> On Mon, Feb 5, 2018 at 2:47 PM, Timo Walther 
> wrote:
>
> Hi Aljoscha,
>>
>> it would be great if we can include the first version of the SQL
>> client
>> (see FLIP-24, Implementation Plan 1). I will open a PR this week. I
>> think
>> we can merge this with explicit "experimental/alpha" status. It is far
>>
> away
>
>> from feature completeness but will be a great tool for Flink
>> beginners.
>>
>> In order to use the SQL client we would need to also add some table
>> sources with the new unified table factories (FLINK-8535), but this is
>> optional because a user can implement own table factories at the
>>
> begining.
>
>> Regards,
>> Timo
>>
>>
>> Am 2/5/18 um 2:36 PM schrieb Tzu-Li (Gordon) Tai:
>>
>> Hi Aljoscha,
>>
>>> Thanks for starting the discussion.
>>>
>>> I think there’s a few connector related must-have improvements that
>>> we
>>> should get in before the feature freeze, since quite a few users have
>>>
>> been
>
>> asking for them:
>>>
>>> [FLINK-6352] FlinkKafkaConsumer should support to use timestamp to
>>> set
>>>
>> up
>
>> start offset
>>> [FLINK-5479] Per-partition watermarks in FlinkKafkaConsumer should
>>> consider idle partitions
>>> [FLINK-8516] Pluggable shard-to-subtask partitioning for
>>> FlinkKinesisConsumer
>>> [FLINK-6109] Add a “checkpointed offset” metric to FlinkKafkaConsumer
>>>
>>> These are still missing in the master branch. Only FLINK-5479 is
>>> still
>>> lacking a pull request.
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On 31 January 2018 at 10:38:43 AM, Aljoscha Krettek (
>>>
>> aljos...@apache.org)
>
>> wrote:
>>> Hi Everyone,
>>>
>>> When we 

Re: build/devops

2018-02-08 Thread Fabian Hueske
A full build incl. tests takes on my machine about 40 to 50 mins (if all
dependencies are locally cached).
Locally, I only run tests on the modules that I modified as Chesnay said.

2018-02-08 8:05 GMT+01:00 Chesnay Schepler :

> Generally we run the full test suite on travis for every pull request. I
> would guess that dev working on some part
> only run certain tests locally though.
>
>
> On 08.02.2018 01:06, cw7k wrote:
>
>> Hi, a full mvn install takes about 2-2.5 hours.  Does the Flink team run
>> the full testsuite on every dev pull request, or use a main dev branch that
>> gets the full test periodically?  Or a different method?  The 2 hours can
>> be a drag when one team needs another's changes and each attempt at passing
>> requires a wait of 2+ hours.
>>
>
>
>


[jira] [Created] (FLINK-8602) Accelerate recover from failover when use incremental checkpoint

2018-02-08 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8602:
-

 Summary: Accelerate recover from failover when use incremental 
checkpoint
 Key: FLINK-8602
 URL: https://issues.apache.org/jira/browse/FLINK-8602
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.4.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


Currently, when enable incremental checkpoint, if user change the parallelism 
then `hasExtraKeys` may be `true`. If this occur, flink will loop all rocksdb 
instance and iterator all data to fetch the data that fails into current 
`KeyGroupRange`, this can be improved because

 if a state handle's `KeyGroupRange` is fully covered by  current 
`KeyGroupRange`, we can open the rocksdb it corresponded directly.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)