[jira] [Commented] (FLINK-7307) Add proper command line parsing tool to ClusterEntrypoint

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126833#comment-16126833
 ] 

ASF GitHub Bot commented on FLINK-7307:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4476
  
@tillrohrmann @aljoscha I have fixed 
[https://issues.apache.org/jira/browse/FLINK-7307](https://issues.apache.org/jira/browse/FLINK-7307)
 in this PR, could you please have a look when you're free, thanks


> Add proper command line parsing tool to ClusterEntrypoint
> -
>
> Key: FLINK-7307
> URL: https://issues.apache.org/jira/browse/FLINK-7307
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Fang Yong
>  Labels: flip-6
>
> We need to add a proper command line parsing tool to the entry point of the 
> {{ClusterEntrypoint#parseArguments}}. At the moment, we are simply using the 
> {{ParameterTool}} as a temporary solution.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4476: [FLINK-7307] Add proper command line parsing tool to Clus...

2017-08-14 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4476
  
@tillrohrmann @aljoscha I have fixed 
[https://issues.apache.org/jira/browse/FLINK-7307](https://issues.apache.org/jira/browse/FLINK-7307)
 in this PR, could you please have a look when you're free, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4473: [FLINK-7367][kinesis connector] Parameterize more ...

2017-08-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4473#discussion_r133119309
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
 ---
@@ -20,14 +20,24 @@
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 
 /**
+ * @deprecated
--- End diff --

@bowenli86 one thing to mention:
We should always try to have good Javadoc for why something is deprecated. 
Sorry but I overlooked this on my reviewing. I'll address this myself this time 
while merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126820#comment-16126820
 ] 

ASF GitHub Bot commented on FLINK-7367:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4473#discussion_r133119309
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
 ---
@@ -20,14 +20,24 @@
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 
 /**
+ * @deprecated
--- End diff --

@bowenli86 one thing to mention:
We should always try to have good Javadoc for why something is deprecated. 
Sorry but I overlooked this on my reviewing. I'll address this myself this time 
while merging.


> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126812#comment-16126812
 ] 

ASF GitHub Bot commented on FLINK-7367:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4473
  
LGTM! I'll rebase this on #4537, and will merge as soon as Travis gives 
green.


> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...

2017-08-14 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4473
  
LGTM! I'll rebase this on #4537, and will merge as soon as Travis gives 
green.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4537: [FLINK-7440] [kinesis] Add various eager serializability ...

2017-08-14 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4537
  
@bowenli86 @zentol Thanks a lot for the reviews! I will address your 
comments and merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7440) Add eager serializable checks on provided de-/serialization schemas for Kinesis consumer / producer

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126800#comment-16126800
 ] 

ASF GitHub Bot commented on FLINK-7440:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4537
  
@bowenli86 @zentol Thanks a lot for the reviews! I will address your 
comments and merge this.


> Add eager serializable checks on provided de-/serialization schemas for 
> Kinesis consumer / producer
> ---
>
> Key: FLINK-7440
> URL: https://issues.apache.org/jira/browse/FLINK-7440
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.4.0, 1.3.3
>
>
> For better user experience, we should add eager serializable checks on the 
> provided {{KinesisDeserializationSchema}} / {{KinesisSerializationSchema}}, 
> with better error messages pointing out exactly that the serialization schema 
> isn't serializable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-4500) Cassandra sink can lose messages

2017-08-14 Thread Michael Fong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Fong reassigned FLINK-4500:
---

Assignee: Michael Fong

> Cassandra sink can lose messages
> 
>
> Key: FLINK-4500
> URL: https://issues.apache.org/jira/browse/FLINK-4500
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector
>Affects Versions: 1.1.0
>Reporter: Elias Levy
>Assignee: Michael Fong
>
> The problem is the same as I pointed out with the Kafka producer sink 
> (FLINK-4027).  The CassandraTupleSink's send() and CassandraPojoSink's send() 
> both send data asynchronously to Cassandra and record whether an error occurs 
> via a future callback.  But CassandraSinkBase does not implement 
> Checkpointed, so it can't stop checkpoint from happening even though the are 
> Cassandra queries in flight from the checkpoint that may fail.  If they do 
> fail, they would subsequently not be replayed when the job recovered, and 
> would thus be lost.
> In addition, 
> CassandraSinkBase's close should check whether there is a pending exception 
> and throw it, rather than silently close.  It should also wait for any 
> pending async queries to complete and check their status before closing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6805) Flink Cassandra connector dependency on Netty disagrees with Flink

2017-08-14 Thread Michael Fong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Fong reassigned FLINK-6805:
---

Assignee: Michael Fong

> Flink Cassandra connector dependency on Netty disagrees with Flink
> --
>
> Key: FLINK-6805
> URL: https://issues.apache.org/jira/browse/FLINK-6805
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Shannon Carey
>Assignee: Michael Fong
> Fix For: 1.4.0
>
>
> The Flink Cassandra connector has a dependency on Netty libraries (via 
> promotion of transitive dependencies by the Maven shade plugin) at version 
> 4.0.33.Final, which disagrees with the version included in Flink of 
> 4.0.27.Final which is included & managed by the parent POM via dependency on 
> netty-all.
> Due to use of netty-all, the dependency management doesn't take effect on the 
> individual libraries such as netty-handler, netty-codec, etc.
> I suggest that dependency management of Netty should be added for all Netty 
> libraries individually (netty-handler, etc.) so that all Flink modules use 
> the same version, and similarly I suggest that exclusions be added to the 
> quickstart example POM for the individual Netty libraries so that fat JARs 
> don't include conflicting versions of Netty.
> It seems like this problem started when FLINK-6084 was implemented: 
> transitive dependencies of the flink-connector-cassandra were previously 
> omitted, and now that they are included we must make sure that they agree 
> with the Flink distribution.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7440) Add eager serializable checks on provided de-/serialization schemas for Kinesis consumer / producer

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126768#comment-16126768
 ] 

ASF GitHub Bot commented on FLINK-7440:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4537
  
Once this is checked in, we'll be more confident that 
https://github.com/apache/flink/pull/4473 is all good and it doesn't break 
anything


> Add eager serializable checks on provided de-/serialization schemas for 
> Kinesis consumer / producer
> ---
>
> Key: FLINK-7440
> URL: https://issues.apache.org/jira/browse/FLINK-7440
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.4.0, 1.3.3
>
>
> For better user experience, we should add eager serializable checks on the 
> provided {{KinesisDeserializationSchema}} / {{KinesisSerializationSchema}}, 
> with better error messages pointing out exactly that the serialization schema 
> isn't serializable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4537: [FLINK-7440] [kinesis] Add various eager serializability ...

2017-08-14 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4537
  
Once this is checked in, we'll be more confident that 
https://github.com/apache/flink/pull/4473 is all good and it doesn't break 
anything


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126742#comment-16126742
 ] 

ASF GitHub Bot commented on FLINK-7245:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4530
  
@fhueske Yes, the plural is better. I should have noticed that before. 
This PR is updated with the new package name and an extra delay parameter 
added to the co-operator.


> Enhance the operators to support holding back watermarks
> 
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>   if (timeServiceManager != null) {
>   timeServiceManager.advanceWatermark(mark);
>   }
>   output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

2017-08-14 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4530
  
@fhueske Yes, the plural is better. I should have noticed that before. 
This PR is updated with the new package name and an extra delay parameter 
added to the co-operator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126721#comment-16126721
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4488#discussion_r133104586
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 ---
@@ -46,29 +47,29 @@ class StreamTableSourceScan(
 val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
 val fieldTypes = 
TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
 
-val fieldCnt = fieldNames.length
+val fields = fieldNames.zip(fieldTypes)
 
-val rowtime = tableSource match {
+val withRowtime = tableSource match {
   case timeSource: DefinedRowtimeAttribute if 
timeSource.getRowtimeAttribute != null =>
--- End diff --

Sure, I have logged it https://issues.apache.org/jira/browse/FLINK-7446


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-14 Thread Jark Wu (JIRA)
Jark Wu created FLINK-7446:
--

 Summary: Support to define an existing field as the rowtime field 
for TableSource
 Key: FLINK-7446
 URL: https://issues.apache.org/jira/browse/FLINK-7446
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Jark Wu


Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field for 
a {{TableSource}}. But it would be helpful if we can support to define an 
existing field as the rowtime field. Just like registering a DataStream, the 
rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

2017-08-14 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4488#discussion_r133104586
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 ---
@@ -46,29 +47,29 @@ class StreamTableSourceScan(
 val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
 val fieldTypes = 
TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
 
-val fieldCnt = fieldNames.length
+val fields = fieldNames.zip(fieldTypes)
 
-val rowtime = tableSource match {
+val withRowtime = tableSource match {
   case timeSource: DefinedRowtimeAttribute if 
timeSource.getRowtimeAttribute != null =>
--- End diff --

Sure, I have logged it https://issues.apache.org/jira/browse/FLINK-7446


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7419) Shade jackson dependency in flink-avro

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126717#comment-16126717
 ] 

ASF GitHub Bot commented on FLINK-7419:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4524
  
@zentol Thank you for your review, I have rename the pattern, thanks 


> Shade jackson dependency in flink-avro
> --
>
> Key: FLINK-7419
> URL: https://issues.apache.org/jira/browse/FLINK-7419
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Avro uses {{org.codehouse.jackson}} which also exists in multiple 
> incompatible versions. We should shade it to 
> {{org.apache.flink.shaded.avro.org.codehouse.jackson}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4524: [FLINK-7419] Shade jackson dependency in flink-avro

2017-08-14 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4524
  
@zentol Thank you for your review, I have rename the pattern, thanks 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7441) Double quote string literals is not supported in Table API and SQL

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126712#comment-16126712
 ] 

ASF GitHub Bot commented on FLINK-7441:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4538
  
Thanks for the reminding @zentol . I will pay attention to the commit 
message in the future.


> Double quote string literals is not supported in Table API and SQL
> --
>
> Key: FLINK-7441
> URL: https://issues.apache.org/jira/browse/FLINK-7441
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.2
>Reporter: Jark Wu
>Assignee: Jark Wu
> Fix For: 1.4.0, 1.3.3
>
>
> Code generation doesn't handle double quote string literals and some control 
> characters which leads to compile error.
> {code}
> Caused by: org.codehaus.commons.compiler.CompileException: Line 50, Column 
> 48: Expression "hello" is not an rvalue
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4538: [FLINK-7441] [table] Double quote string literals is not ...

2017-08-14 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4538
  
Thanks for the reminding @zentol . I will pay attention to the commit 
message in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126650#comment-16126650
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r133091608
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -340,6 +362,65 @@ public void resetNFAChanged() {
return Tuple2.of(result, timeoutResult);
}
 
+   private void 
discardComputationStatesAccordingToStrategy(Queue 
computationStates,
+   Collection> matchedResult, 
AfterMatchSkipStrategy afterMatchSkipStrategy) {
+   Set discardEvents = new HashSet<>();
+   switch(afterMatchSkipStrategy.getStrategy()) {
+   case SKIP_TO_LAST:
+   for (Map resultMap: 
matchedResult) {
+   for (Map.Entry 
keyMatches : resultMap.entrySet()) {
+   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+   
discardEvents.addAll(keyMatches.getValue().subList(0, 
keyMatches.getValue().size() - 1));
+   break;
+   } else {
+   
discardEvents.addAll(keyMatches.getValue());
+   }
+   }
+   }
+   break;
+   case SKIP_TO_FIRST:
+   for (Map resultMap: 
matchedResult) {
+   for (Map.Entry 
keyMatches : resultMap.entrySet()) {
+   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+   break;
+   } else {
+   
discardEvents.addAll(keyMatches.getValue());
+   }
+   }
+   }
+   break;
+   case SKIP_PAST_LAST_EVENT:
+   for (Map resultMap: 
matchedResult) {
+   for (List eventList: 
resultMap.values()) {
+   discardEvents.addAll(eventList);
+   }
+   }
+   break;
+   }
+   if (!discardEvents.isEmpty()) {
+   List discardStates = new 
ArrayList<>();
+   for (ComputationState computationState : 
computationStates) {
+   Map partialMatch = 
extractCurrentMatches(computationState);
+   for (List list: partialMatch.values()) {
+   for (T e: list) {
+   if (discardEvents.contains(e)) {
+   // discard the 
computation state.
+   
eventSharedBuffer.release(
+   
NFAStateNameHandler.getOriginalNameFromInternal(
+   
computationState.getState().getName()),
+   
computationState.getEvent(),
+   
computationState.getTimestamp(),
+   
computationState.getCounter()
+   );
+   
discardStates.add(computationState);
--- End diff --

Yes, you are right. Thanks for pointing it out!


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function 

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-08-14 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r133091608
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -340,6 +362,65 @@ public void resetNFAChanged() {
return Tuple2.of(result, timeoutResult);
}
 
+   private void 
discardComputationStatesAccordingToStrategy(Queue 
computationStates,
+   Collection> matchedResult, 
AfterMatchSkipStrategy afterMatchSkipStrategy) {
+   Set discardEvents = new HashSet<>();
+   switch(afterMatchSkipStrategy.getStrategy()) {
+   case SKIP_TO_LAST:
+   for (Map resultMap: 
matchedResult) {
+   for (Map.Entry 
keyMatches : resultMap.entrySet()) {
+   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+   
discardEvents.addAll(keyMatches.getValue().subList(0, 
keyMatches.getValue().size() - 1));
+   break;
+   } else {
+   
discardEvents.addAll(keyMatches.getValue());
+   }
+   }
+   }
+   break;
+   case SKIP_TO_FIRST:
+   for (Map resultMap: 
matchedResult) {
+   for (Map.Entry 
keyMatches : resultMap.entrySet()) {
+   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+   break;
+   } else {
+   
discardEvents.addAll(keyMatches.getValue());
+   }
+   }
+   }
+   break;
+   case SKIP_PAST_LAST_EVENT:
+   for (Map resultMap: 
matchedResult) {
+   for (List eventList: 
resultMap.values()) {
+   discardEvents.addAll(eventList);
+   }
+   }
+   break;
+   }
+   if (!discardEvents.isEmpty()) {
+   List discardStates = new 
ArrayList<>();
+   for (ComputationState computationState : 
computationStates) {
+   Map partialMatch = 
extractCurrentMatches(computationState);
+   for (List list: partialMatch.values()) {
+   for (T e: list) {
+   if (discardEvents.contains(e)) {
+   // discard the 
computation state.
+   
eventSharedBuffer.release(
+   
NFAStateNameHandler.getOriginalNameFromInternal(
+   
computationState.getState().getName()),
+   
computationState.getEvent(),
+   
computationState.getTimestamp(),
+   
computationState.getCounter()
+   );
+   
discardStates.add(computationState);
--- End diff --

Yes, you are right. Thanks for pointing it out!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7409) WebRuntimeMonitor blocks serving threads

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126587#comment-16126587
 ] 

ASF GitHub Bot commented on FLINK-7409:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4527
  
@tillrohrmann please rebase now that #4492 has been merged


> WebRuntimeMonitor blocks serving threads
> 
>
> Key: FLINK-7409
> URL: https://issues.apache.org/jira/browse/FLINK-7409
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> The {{WebRuntimeMonitor}} contains a lot of blocking operations where it 
> retrieves a result from the {{JobManager}} and then waits on the future to 
> obtain the result. This is not a good design since we are blocking server 
> threads with that. Instead I propose to follow a more reactive approach where 
> the {{RequestHandler}} returns a {{CompletableFuture}} of {{FullHttpResonse}} 
> which is in the completion handler written out to the channel.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4527: [FLINK-7409] [web] Make WebRuntimeMonitor reactive

2017-08-14 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4527
  
@tillrohrmann please rebase now that #4492 has been merged


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4535: Eventhubs-support read from and write to Azure eventhubs

2017-08-14 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4535
  
@zhuganghuaonnet we should also mention that new connectors are being 
contributed through the Flink release of [Apache 
Bahir](http://bahir.apache.org).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7443) Store and deserializer fields in MetricFetcher should be final

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126495#comment-16126495
 ] 

ASF GitHub Bot commented on FLINK-7443:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4539
  
+1 but I'm only seeing an improvement and not a bug


> Store and deserializer fields in MetricFetcher should be final
> --
>
> Key: FLINK-7443
> URL: https://issues.apache.org/jira/browse/FLINK-7443
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>
> The {{MetricStore}} and {{MetricDumpDeserializer}} fields in the 
> {{MetricFetcher}} should be final, as they are not meant to be overwritten 
> and are even used for synchronization.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4539: [FLINK-7443] [metrics] MetricFetcher store and deserializ...

2017-08-14 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4539
  
+1 but I'm only seeing an improvement and not a bug


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7445) Remove FLINK-1234 reference from PR template

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126478#comment-16126478
 ] 

ASF GitHub Bot commented on FLINK-7445:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4542
  
+1


> Remove FLINK-1234 reference from PR template
> 
>
> Key: FLINK-7445
> URL: https://issues.apache.org/jira/browse/FLINK-7445
> Project: Flink
>  Issue Type: Improvement
>  Components: GitHub
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>
> The PR template on github contains a reference to FLINK-1234 as an example 
> for the PR title. 
> The problem is that every PR that doesn't fill out the template, or rather 
> does not delete that part of the template, will now be referenced in 
> FLINK-1234, leading to spam on the mailing list.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4542: [FLINK-7445] [GitHub] Remove FLINK-1234 reference from PR...

2017-08-14 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4542
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1234) Make Hadoop2 profile default

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126423#comment-16126423
 ] 

ASF GitHub Bot commented on FLINK-1234:
---

Github user AjayTripathy closed the pull request at:

https://github.com/apache/flink/pull/4541


> Make Hadoop2 profile default
> 
>
> Key: FLINK-1234
> URL: https://issues.apache.org/jira/browse/FLINK-1234
> Project: Flink
>  Issue Type: Improvement
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 0.8.0
>
>
> As per mailing list discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4541: Small typo fix in the scope section

2017-08-14 Thread AjayTripathy
Github user AjayTripathy closed the pull request at:

https://github.com/apache/flink/pull/4541


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7445) Remove FLINK-1234 reference from PR template

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126413#comment-16126413
 ] 

ASF GitHub Bot commented on FLINK-7445:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/4542

[FLINK-7445] [GitHub] Remove FLINK-1234 reference from PR template

The PR template on github contains a reference to FLINK-1234 as an example 
for the PR title.

The problem is that every PR that doesn't fill out the template, or rather 
does not delete that part of the template, will now be referenced in 
FLINK-1234, leading to spam on the mailing list.

I've replaced `1234` with ``, which should still get the idea across.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 7445

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4542.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4542


commit 3449b390823467c058c1a14fd5889b87200fbdad
Author: zentol 
Date:   2017-08-14T21:14:13Z

[FLINK-7445] [GitHub] Remove FLINK-1234 reference from PR template




> Remove FLINK-1234 reference from PR template
> 
>
> Key: FLINK-7445
> URL: https://issues.apache.org/jira/browse/FLINK-7445
> Project: Flink
>  Issue Type: Improvement
>  Components: GitHub
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>
> The PR template on github contains a reference to FLINK-1234 as an example 
> for the PR title. 
> The problem is that every PR that doesn't fill out the template, or rather 
> does not delete that part of the template, will now be referenced in 
> FLINK-1234, leading to spam on the mailing list.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4542: [FLINK-7445] [GitHub] Remove FLINK-1234 reference ...

2017-08-14 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/4542

[FLINK-7445] [GitHub] Remove FLINK-1234 reference from PR template

The PR template on github contains a reference to FLINK-1234 as an example 
for the PR title.

The problem is that every PR that doesn't fill out the template, or rather 
does not delete that part of the template, will now be referenced in 
FLINK-1234, leading to spam on the mailing list.

I've replaced `1234` with ``, which should still get the idea across.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 7445

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4542.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4542


commit 3449b390823467c058c1a14fd5889b87200fbdad
Author: zentol 
Date:   2017-08-14T21:14:13Z

[FLINK-7445] [GitHub] Remove FLINK-1234 reference from PR template




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-7445) Remove FLINK-1234 reference from PR template

2017-08-14 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-7445:
---

Assignee: Chesnay Schepler

> Remove FLINK-1234 reference from PR template
> 
>
> Key: FLINK-7445
> URL: https://issues.apache.org/jira/browse/FLINK-7445
> Project: Flink
>  Issue Type: Improvement
>  Components: GitHub
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>
> The PR template on github contains a reference to FLINK-1234 as an example 
> for the PR title. 
> The problem is that every PR that doesn't fill out the template, or rather 
> does not delete that part of the template, will now be referenced in 
> FLINK-1234, leading to spam on the mailing list.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4541: Small typo fix in the scope section

2017-08-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4541
  
I've merged the commit in 4a4db0a7f9ed2d8f1590589be3a4b43789208e6c, could 
you close this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4541: Small typo fix in the scope section

2017-08-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4541
  
Thank you for fixing this, merging the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7445) Remove FLINK-1234 reference from PR template

2017-08-14 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-7445:
---

 Summary: Remove FLINK-1234 reference from PR template
 Key: FLINK-7445
 URL: https://issues.apache.org/jira/browse/FLINK-7445
 Project: Flink
  Issue Type: Improvement
  Components: GitHub
Reporter: Chesnay Schepler
Priority: Critical


The PR template on github contains a reference to FLINK-1234 as an example for 
the PR title. 

The problem is that every PR that doesn't fill out the template, or rather does 
not delete that part of the template, will now be referenced in FLINK-1234, 
leading to spam on the mailing list.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-08-14 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126380#comment-16126380
 ] 

Fabian Hueske edited comment on FLINK-6233 at 8/14/17 9:00 PM:
---

Hi [~xccui]:

1. Yes, if both implementations can share significant parts of their code 
please do so.
2. No, all clean up timers should be on processing time. This is important for 
consistency with the other operators.
3. The current design is optimized for the RocksDBStateBackend which returns 
keys in order. For RocksDB (and hence for most serious use cases) the current 
implementation is optimal. IMO, we should not change that.
4. We will add an allowed latency parameter later in the QueryConfig. You can 
prepare the operator to handle this case if it does not add substantial code 
complexity.

I think separate timer services are an optimization that could be added later. 
I haven't thought in detail about this, but if I'm not mistaken separate timers 
would only be beneficial in certain combinations of time range join predicates 
and delayed streams and would not improve the performance / reduce the state 
size of a join in all cases. [~xccui] what are your thoughts on this?



was (Author: fhueske):
Hi [~xccui]:

#1 Yes, if both implementations can share significant parts of their code 
please do so.
#1 No, all clean up timers should be on processing time. This is important for 
consistency with the other operators.
#1 The current design is optimized for the RocksDBStateBackend which returns 
keys in order. For RocksDB (and hence for most serious use cases) the current 
implementation is optimal. IMO, we should not change that.
#1 We will add an allowed latency parameter later in the QueryConfig. You can 
prepare the operator to handle this case if it does not add substantial code 
complexity.

I think separate timer services are an optimization that could be added later. 
I haven't thought in detail about this, but if I'm not mistaken separate timers 
would only be beneficial in certain combinations of time range join predicates 
and delayed streams and would not improve the performance / reduce the state 
size of a join in all cases. [~xccui] what are your thoughts on this?


> Support rowtime inner equi-join between two streams in the SQL API
> --
>
> Key: FLINK-6233
> URL: https://issues.apache.org/jira/browse/FLINK-6233
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime  s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-08-14 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126380#comment-16126380
 ] 

Fabian Hueske commented on FLINK-6233:
--

Hi [~xccui]:

#1 Yes, if both implementations can share significant parts of their code 
please do so.
#1 No, all clean up timers should be on processing time. This is important for 
consistency with the other operators.
#1 The current design is optimized for the RocksDBStateBackend which returns 
keys in order. For RocksDB (and hence for most serious use cases) the current 
implementation is optimal. IMO, we should not change that.
#1 We will add an allowed latency parameter later in the QueryConfig. You can 
prepare the operator to handle this case if it does not add substantial code 
complexity.

I think separate timer services are an optimization that could be added later. 
I haven't thought in detail about this, but if I'm not mistaken separate timers 
would only be beneficial in certain combinations of time range join predicates 
and delayed streams and would not improve the performance / reduce the state 
size of a join in all cases. [~xccui] what are your thoughts on this?


> Support rowtime inner equi-join between two streams in the SQL API
> --
>
> Key: FLINK-6233
> URL: https://issues.apache.org/jira/browse/FLINK-6233
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime  s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7444) Make external calls non-blocking

2017-08-14 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126361#comment-16126361
 ] 

Till Rohrmann commented on FLINK-7444:
--

It is problematic if the error handler tries to stop the failing 
{{RpcEndpoint}} in a blocking fashion. Then, it is basically deadlocked because 
the actor thread never terminates. We have seen this problem with the 
{{MiniCluster}} where an {{Exception}} is thrown at shut down which blocks the 
actor's main thread while the {{MiniCluster}} is being shut down waiting for 
the {{ActorSystem}} to terminate.

I think the underlying problem is that one does not know what's happening 
outside of the {{RpcEndpoint's}} main thread and the idea was to guard against 
this by making the calls asynchronous. I see the point that one would want to 
react fast to fatal errors and maybe the problem is that we are abusing the 
{{FatalErrorHandler}} also for non fatal errors (e.g. more like an uncaught 
exception handler). Maybe we can introduce different failure cases but then one 
shouldn't do any blocking operations which require the {{RpcEndpoint}} to be 
terminated in the fatal error case.



> Make external calls non-blocking
> 
>
> Key: FLINK-7444
> URL: https://issues.apache.org/jira/browse/FLINK-7444
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> All external calls from a {{RpcEndpoint}} can be potentially blocking, e.g. 
> calls to the {{FatalErrorHandler}}. Therefore, I propose to make all these 
> calls coming from the {{RpcEndpoint's}} main thread non-blocking by running 
> them in an {{Executor}}. That way the main thread will never be blocked.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126307#comment-16126307
 ] 

ASF GitHub Bot commented on FLINK-7245:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4530
  
Good proposal @xccui! I'd prefer the plural: 
`org.apache.flink.table.runtime.operators`


> Enhance the operators to support holding back watermarks
> 
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>   if (timeServiceManager != null) {
>   timeServiceManager.advanceWatermark(mark);
>   }
>   output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

2017-08-14 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4530
  
Good proposal @xccui! I'd prefer the plural: 
`org.apache.flink.table.runtime.operators`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4541: Small typo fix in the scope section

2017-08-14 Thread AjayTripathy
Github user AjayTripathy commented on the issue:

https://github.com/apache/flink/pull/4541
  
There is no JIRA ticket, as I reviewed in the documentation guidelines, 
trivial fixes and typos require no ticket.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1234) Make Hadoop2 profile default

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126266#comment-16126266
 ] 

ASF GitHub Bot commented on FLINK-1234:
---

GitHub user AjayTripathy opened a pull request:

https://github.com/apache/flink/pull/4541

Small typo fix in the scope section

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-1234] [component] Title of 
the pull request", where *FLINK-1234* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository 

[GitHub] flink pull request #4541: Small typo fix in the scope section

2017-08-14 Thread AjayTripathy
GitHub user AjayTripathy opened a pull request:

https://github.com/apache/flink/pull/4541

Small typo fix in the scope section

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-1234] [component] Title of 
the pull request", where *FLINK-1234* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/AjayTripathy/flink patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4541.patch

To close this pull request, make a 

[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126263#comment-16126263
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r133037759
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

What would happen if the left most table does not have a time attribute (or 
if it is projected out)? I just think that the semantics of the `StreamRecord` 
timestamps are too important to have an implicit behavior that is hard to 
explain and reason about for users. IMO, an exception that asks for explicit 
user input is the better choice compared to a behavior that depends on 
non-obvious query characteristics and is hard to predict.


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r133037759
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

What would happen if the left most table does not have a time attribute (or 
if it is projected out)? I just think that the semantics of the `StreamRecord` 
timestamps are too important to have an implicit behavior that is hard to 
explain and reason about for users. IMO, an exception that asks for explicit 
user input is the better choice compared to a behavior that depends on 
non-obvious query characteristics and is hard to predict.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126188#comment-16126188
 ] 

ASF GitHub Bot commented on FLINK-7213:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133027325
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class encapsulates state handles to the snapshots of all operator 
instances executed within one task. A task
+ * can run multiple operator instances as a result of operator chaining, 
and all operator instances from the chain can
+ * register their state under their operator id. Each operator instance is 
a physical execution responsible for
+ * processing a partition of the data that goes through a logical 
operator. This partitioning happens to parallelize
+ * execution of logical operators, e.g. distributing a map function.
+ *
+ * One instance of this class contains the information that one task 
will send to acknowledge a checkpoint request by
+ * the checkpoint coordinator. Tasks run operator instances in parallel, 
so the union of all
+ * {@link TaskStateSnapshot} that are collected by the checkpoint 
coordinator from all tasks represent the whole
+ * state of a job at the time of the checkpoint.
+ *
+ * This class should be called TaskState once the old class with this 
name that we keep for backwards
+ * compatibility goes away.
+ */
+public class TaskStateSnapshot implements CompositeStateHandle {
--- End diff --

This class is totally intended to be immutable. So beyond what it is 
currently enforcing, do you suggest using immutable collections inside?


> Introduce state management by OperatorID in TaskManager
> ---
>
> Key: FLINK-7213
> URL: https://issues.apache.org/jira/browse/FLINK-7213
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133027325
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class encapsulates state handles to the snapshots of all operator 
instances executed within one task. A task
+ * can run multiple operator instances as a result of operator chaining, 
and all operator instances from the chain can
+ * register their state under their operator id. Each operator instance is 
a physical execution responsible for
+ * processing a partition of the data that goes through a logical 
operator. This partitioning happens to parallelize
+ * execution of logical operators, e.g. distributing a map function.
+ *
+ * One instance of this class contains the information that one task 
will send to acknowledge a checkpoint request by
+ * the checkpoint coordinator. Tasks run operator instances in parallel, 
so the union of all
+ * {@link TaskStateSnapshot} that are collected by the checkpoint 
coordinator from all tasks represent the whole
+ * state of a job at the time of the checkpoint.
+ *
+ * This class should be called TaskState once the old class with this 
name that we keep for backwards
+ * compatibility goes away.
+ */
+public class TaskStateSnapshot implements CompositeStateHandle {
--- End diff --

This class is totally intended to be immutable. So beyond what it is 
currently enforcing, do you suggest using immutable collections inside?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126176#comment-16126176
 ] 

ASF GitHub Bot commented on FLINK-7213:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133026125
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class encapsulates state handles to the snapshots of all operator 
instances executed within one task. A task
+ * can run multiple operator instances as a result of operator chaining, 
and all operator instances from the chain can
+ * register their state under their operator id. Each operator instance is 
a physical execution responsible for
+ * processing a partition of the data that goes through a logical 
operator. This partitioning happens to parallelize
+ * execution of logical operators, e.g. distributing a map function.
+ *
+ * One instance of this class contains the information that one task 
will send to acknowledge a checkpoint request by
+ * the checkpoint coordinator. Tasks run operator instances in parallel, 
so the union of all
+ * {@link TaskStateSnapshot} that are collected by the checkpoint 
coordinator from all tasks represent the whole
+ * state of a job at the time of the checkpoint.
+ *
+ * This class should be called TaskState once the old class with this 
name that we keep for backwards
+ * compatibility goes away.
+ */
+public class TaskStateSnapshot implements CompositeStateHandle {
+
+   private static final long serialVersionUID = 1L;
+
+   /** Mapping from an operator id to the state of one subtask of this 
operator */
+   private final Map 
subtaskStatesByOperatorID;
--- End diff --

Hmm, I think if we consider default load factors and for large sizes, I 
would pick a min >30% hit rate linear array scan over 100% hit rate random 
access iteration. For all expected sizes (in cache) in this class, it should 
not matter. LHM also consumes a bit more memory. I would tend to keep it this 
way.


> Introduce state management by OperatorID in TaskManager
> ---
>
> Key: FLINK-7213
> URL: https://issues.apache.org/jira/browse/FLINK-7213
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133026125
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class encapsulates state handles to the snapshots of all operator 
instances executed within one task. A task
+ * can run multiple operator instances as a result of operator chaining, 
and all operator instances from the chain can
+ * register their state under their operator id. Each operator instance is 
a physical execution responsible for
+ * processing a partition of the data that goes through a logical 
operator. This partitioning happens to parallelize
+ * execution of logical operators, e.g. distributing a map function.
+ *
+ * One instance of this class contains the information that one task 
will send to acknowledge a checkpoint request by
+ * the checkpoint coordinator. Tasks run operator instances in parallel, 
so the union of all
+ * {@link TaskStateSnapshot} that are collected by the checkpoint 
coordinator from all tasks represent the whole
+ * state of a job at the time of the checkpoint.
+ *
+ * This class should be called TaskState once the old class with this 
name that we keep for backwards
+ * compatibility goes away.
+ */
+public class TaskStateSnapshot implements CompositeStateHandle {
+
+   private static final long serialVersionUID = 1L;
+
+   /** Mapping from an operator id to the state of one subtask of this 
operator */
+   private final Map 
subtaskStatesByOperatorID;
--- End diff --

Hmm, I think if we consider default load factors and for large sizes, I 
would pick a min >30% hit rate linear array scan over 100% hit rate random 
access iteration. For all expected sizes (in cache) in this class, it should 
not matter. LHM also consumes a bit more memory. I would tend to keep it this 
way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133010887
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class encapsulates state handles to the snapshots of all operator 
instances executed within one task. A task
+ * can run multiple operator instances as a result of operator chaining, 
and all operator instances from the chain can
+ * register their state under their operator id. Each operator instance is 
a physical execution responsible for
+ * processing a partition of the data that goes through a logical 
operator. This partitioning happens to parallelize
+ * execution of logical operators, e.g. distributing a map function.
+ *
+ * One instance of this class contains the information that one task 
will send to acknowledge a checkpoint request by
+ * the checkpoint coordinator. Tasks run operator instances in parallel, 
so the union of all
+ * {@link TaskStateSnapshot} that are collected by the checkpoint 
coordinator from all tasks represent the whole
+ * state of a job at the time of the checkpoint.
+ *
+ * This class should be called TaskState once the old class with this 
name that we keep for backwards
+ * compatibility goes away.
+ */
+public class TaskStateSnapshot implements CompositeStateHandle {
--- End diff --

Would it make sense to make this immutable? It looks like this should not 
be modified any more after fully constructing it. This would also make it clear 
that methods iterating over the state, or returning sets / iterables can never 
fail with concurrent modifications.

For example the `size` method is considered a "best effort" method for info 
purposes only, and should not fail with an exception (it currently could fail 
with a `ConcurrentModificationException`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126154#comment-16126154
 ] 

ASF GitHub Bot commented on FLINK-7213:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133021720
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ---
@@ -850,18 +843,20 @@ public void 
testSuccessfulCheckpointSubsumesUnsuccessful() {
OperatorID opID2 = 
OperatorID.fromJobVertexID(ackVertex2.getJobvertexId());
OperatorID opID3 = 
OperatorID.fromJobVertexID(ackVertex3.getJobvertexId());
 
-   Map operatorStates1 = 
pending1.getOperatorStates();
+   TaskStateSnapshot taskOperatorSubtaskStates1_1 = 
spy(new TaskStateSnapshot());
--- End diff --

Is spying necessary here? There seem to be no `verify()` calls on this 
type...


> Introduce state management by OperatorID in TaskManager
> ---
>
> Key: FLINK-7213
> URL: https://issues.apache.org/jira/browse/FLINK-7213
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133021771
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ---
@@ -878,14 +873,17 @@ public void 
testSuccessfulCheckpointSubsumesUnsuccessful() {
}
long checkpointId2 = pending2.getCheckpointId();
 
-   Map operatorStates2 = 
pending2.getOperatorStates();
+   TaskStateSnapshot taskOperatorSubtaskStates2_1 = 
spy(new TaskStateSnapshot());
--- End diff --

Same as above, spying necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126150#comment-16126150
 ] 

ASF GitHub Bot commented on FLINK-7213:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133010887
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class encapsulates state handles to the snapshots of all operator 
instances executed within one task. A task
+ * can run multiple operator instances as a result of operator chaining, 
and all operator instances from the chain can
+ * register their state under their operator id. Each operator instance is 
a physical execution responsible for
+ * processing a partition of the data that goes through a logical 
operator. This partitioning happens to parallelize
+ * execution of logical operators, e.g. distributing a map function.
+ *
+ * One instance of this class contains the information that one task 
will send to acknowledge a checkpoint request by
+ * the checkpoint coordinator. Tasks run operator instances in parallel, 
so the union of all
+ * {@link TaskStateSnapshot} that are collected by the checkpoint 
coordinator from all tasks represent the whole
+ * state of a job at the time of the checkpoint.
+ *
+ * This class should be called TaskState once the old class with this 
name that we keep for backwards
+ * compatibility goes away.
+ */
+public class TaskStateSnapshot implements CompositeStateHandle {
--- End diff --

Would it make sense to make this immutable? It looks like this should not 
be modified any more after fully constructing it. This would also make it clear 
that methods iterating over the state, or returning sets / iterables can never 
fail with concurrent modifications.

For example the `size` method is considered a "best effort" method for info 
purposes only, and should not fail with an exception (it currently could fail 
with a `ConcurrentModificationException`).


> Introduce state management by OperatorID in TaskManager
> ---
>
> Key: FLINK-7213
> URL: https://issues.apache.org/jira/browse/FLINK-7213
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126152#comment-16126152
 ] 

ASF GitHub Bot commented on FLINK-7213:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133022095
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ---
@@ -553,31 +551,29 @@ public void testTriggerAndConfirmSimpleCheckpoint() {
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.isFullyAcknowledged());
 
-   OperatorID opID1 = 
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
-   OperatorID opID2 = 
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
-
-   Map operatorStates = 
checkpoint.getOperatorStates();
-
-   operatorStates.put(opID1, new SpyInjectingOperatorState(
-   opID1, 
vertex1.getTotalNumberOfParallelSubtasks(), vertex1.getMaxParallelism()));
-   operatorStates.put(opID2, new SpyInjectingOperatorState(
-   opID2, 
vertex2.getTotalNumberOfParallelSubtasks(), vertex2.getMaxParallelism()));
-
// check that the vertices received the trigger 
checkpoint message
{
verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), 
any(CheckpointOptions.class));
verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), 
any(CheckpointOptions.class));
}
 
+   OperatorID opID1 = 
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+   OperatorID opID2 = 
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+   TaskStateSnapshot taskOperatorSubtaskStates1 = 
mock(TaskStateSnapshot.class);
--- End diff --

Why not create a proper `TaskStateSnapshot` with one entry, rather than 
mocking?


> Introduce state management by OperatorID in TaskManager
> ---
>
> Key: FLINK-7213
> URL: https://issues.apache.org/jira/browse/FLINK-7213
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126151#comment-16126151
 ] 

ASF GitHub Bot commented on FLINK-7213:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133013315
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 ---
@@ -164,8 +168,16 @@ public void acknowledgeCheckpoint(
throw new RuntimeException(e);
}
 
+   boolean hasManagedKeyedState = false;
+   for (Map.Entry entry : checkpointStateHandles.getSubtaskStateMappings()) 
{
+   OperatorSubtaskState state = 
entry.getValue();
+   if (state != null) {
+   hasManagedKeyedState |= 
state.getManagedKeyedState() != null;
+   }
+   }
+
// should be one k/v state
--- End diff --

"should be **at least** one k/v state"?


> Introduce state management by OperatorID in TaskManager
> ---
>
> Key: FLINK-7213
> URL: https://issues.apache.org/jira/browse/FLINK-7213
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133013315
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 ---
@@ -164,8 +168,16 @@ public void acknowledgeCheckpoint(
throw new RuntimeException(e);
}
 
+   boolean hasManagedKeyedState = false;
+   for (Map.Entry entry : checkpointStateHandles.getSubtaskStateMappings()) 
{
+   OperatorSubtaskState state = 
entry.getValue();
+   if (state != null) {
+   hasManagedKeyedState |= 
state.getManagedKeyedState() != null;
+   }
+   }
+
// should be one k/v state
--- End diff --

"should be **at least** one k/v state"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133021720
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ---
@@ -850,18 +843,20 @@ public void 
testSuccessfulCheckpointSubsumesUnsuccessful() {
OperatorID opID2 = 
OperatorID.fromJobVertexID(ackVertex2.getJobvertexId());
OperatorID opID3 = 
OperatorID.fromJobVertexID(ackVertex3.getJobvertexId());
 
-   Map operatorStates1 = 
pending1.getOperatorStates();
+   TaskStateSnapshot taskOperatorSubtaskStates1_1 = 
spy(new TaskStateSnapshot());
--- End diff --

Is spying necessary here? There seem to be no `verify()` calls on this 
type...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126155#comment-16126155
 ] 

ASF GitHub Bot commented on FLINK-7213:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133018189
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -185,44 +184,66 @@ private void assignAttemptState(ExecutionJobVertex 
executionJobVertex, List Introduce state management by OperatorID in TaskManager
> ---
>
> Key: FLINK-7213
> URL: https://issues.apache.org/jira/browse/FLINK-7213
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133009796
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class encapsulates state handles to the snapshots of all operator 
instances executed within one task. A task
+ * can run multiple operator instances as a result of operator chaining, 
and all operator instances from the chain can
+ * register their state under their operator id. Each operator instance is 
a physical execution responsible for
+ * processing a partition of the data that goes through a logical 
operator. This partitioning happens to parallelize
+ * execution of logical operators, e.g. distributing a map function.
+ *
+ * One instance of this class contains the information that one task 
will send to acknowledge a checkpoint request by
+ * the checkpoint coordinator. Tasks run operator instances in parallel, 
so the union of all
+ * {@link TaskStateSnapshot} that are collected by the checkpoint 
coordinator from all tasks represent the whole
+ * state of a job at the time of the checkpoint.
+ *
+ * This class should be called TaskState once the old class with this 
name that we keep for backwards
+ * compatibility goes away.
+ */
+public class TaskStateSnapshot implements CompositeStateHandle {
+
+   private static final long serialVersionUID = 1L;
+
+   /** Mapping from an operator id to the state of one subtask of this 
operator */
+   private final Map 
subtaskStatesByOperatorID;
--- End diff --

A `LinkedHashMap` has a slightly more predictable iteration performance 
(list traversal) compared to a `HashMap` (search through sparse table array). 
There are a lot of value iterations done in this class, but we also should have 
pretty full hash tables (since we never delete), so not sure how much 
difference it makes...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133018189
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -185,44 +184,66 @@ private void assignAttemptState(ExecutionJobVertex 
executionJobVertex, List

[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126149#comment-16126149
 ] 

ASF GitHub Bot commented on FLINK-7213:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133009796
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class encapsulates state handles to the snapshots of all operator 
instances executed within one task. A task
+ * can run multiple operator instances as a result of operator chaining, 
and all operator instances from the chain can
+ * register their state under their operator id. Each operator instance is 
a physical execution responsible for
+ * processing a partition of the data that goes through a logical 
operator. This partitioning happens to parallelize
+ * execution of logical operators, e.g. distributing a map function.
+ *
+ * One instance of this class contains the information that one task 
will send to acknowledge a checkpoint request by
+ * the checkpoint coordinator. Tasks run operator instances in parallel, 
so the union of all
+ * {@link TaskStateSnapshot} that are collected by the checkpoint 
coordinator from all tasks represent the whole
+ * state of a job at the time of the checkpoint.
+ *
+ * This class should be called TaskState once the old class with this 
name that we keep for backwards
+ * compatibility goes away.
+ */
+public class TaskStateSnapshot implements CompositeStateHandle {
+
+   private static final long serialVersionUID = 1L;
+
+   /** Mapping from an operator id to the state of one subtask of this 
operator */
+   private final Map 
subtaskStatesByOperatorID;
--- End diff --

A `LinkedHashMap` has a slightly more predictable iteration performance 
(list traversal) compared to a `HashMap` (search through sparse table array). 
There are a lot of value iterations done in this class, but we also should have 
pretty full hash tables (since we never delete), so not sure how much 
difference it makes...


> Introduce state management by OperatorID in TaskManager
> ---
>
> Key: FLINK-7213
> URL: https://issues.apache.org/jira/browse/FLINK-7213
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133016663
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
 ---
@@ -75,31 +103,79 @@
 */
private final long stateSize;
 
+   @VisibleForTesting
+   public OperatorSubtaskState(StreamStateHandle legacyOperatorState) {
+
+   this(legacyOperatorState,
+   Collections.emptyList(),
+   Collections.emptyList(),
+   Collections.emptyList(),
+   Collections.emptyList());
+   }
+
+   /**
+* Empty state.
+*/
+   public OperatorSubtaskState() {
--- End diff --

Minor optimization: One could make this constructor `private` and have a 
field `OperatorSubtaskState.EMPTY` as a placeholder for the empty states. I'd 
leave this to you whether you think it worth doing...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126153#comment-16126153
 ] 

ASF GitHub Bot commented on FLINK-7213:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133016663
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
 ---
@@ -75,31 +103,79 @@
 */
private final long stateSize;
 
+   @VisibleForTesting
+   public OperatorSubtaskState(StreamStateHandle legacyOperatorState) {
+
+   this(legacyOperatorState,
+   Collections.emptyList(),
+   Collections.emptyList(),
+   Collections.emptyList(),
+   Collections.emptyList());
+   }
+
+   /**
+* Empty state.
+*/
+   public OperatorSubtaskState() {
--- End diff --

Minor optimization: One could make this constructor `private` and have a 
field `OperatorSubtaskState.EMPTY` as a placeholder for the empty states. I'd 
leave this to you whether you think it worth doing...


> Introduce state management by OperatorID in TaskManager
> ---
>
> Key: FLINK-7213
> URL: https://issues.apache.org/jira/browse/FLINK-7213
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126156#comment-16126156
 ] 

ASF GitHub Bot commented on FLINK-7213:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133021771
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ---
@@ -878,14 +873,17 @@ public void 
testSuccessfulCheckpointSubsumesUnsuccessful() {
}
long checkpointId2 = pending2.getCheckpointId();
 
-   Map operatorStates2 = 
pending2.getOperatorStates();
+   TaskStateSnapshot taskOperatorSubtaskStates2_1 = 
spy(new TaskStateSnapshot());
--- End diff --

Same as above, spying necessary?


> Introduce state management by OperatorID in TaskManager
> ---
>
> Key: FLINK-7213
> URL: https://issues.apache.org/jira/browse/FLINK-7213
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133022095
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ---
@@ -553,31 +551,29 @@ public void testTriggerAndConfirmSimpleCheckpoint() {
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.isFullyAcknowledged());
 
-   OperatorID opID1 = 
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
-   OperatorID opID2 = 
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
-
-   Map operatorStates = 
checkpoint.getOperatorStates();
-
-   operatorStates.put(opID1, new SpyInjectingOperatorState(
-   opID1, 
vertex1.getTotalNumberOfParallelSubtasks(), vertex1.getMaxParallelism()));
-   operatorStates.put(opID2, new SpyInjectingOperatorState(
-   opID2, 
vertex2.getTotalNumberOfParallelSubtasks(), vertex2.getMaxParallelism()));
-
// check that the vertices received the trigger 
checkpoint message
{
verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), 
any(CheckpointOptions.class));
verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), 
any(CheckpointOptions.class));
}
 
+   OperatorID opID1 = 
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+   OperatorID opID2 = 
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+   TaskStateSnapshot taskOperatorSubtaskStates1 = 
mock(TaskStateSnapshot.class);
--- End diff --

Why not create a proper `TaskStateSnapshot` with one entry, rather than 
mocking?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-7394) Implement basic InputChannel for credit-based logic

2017-08-14 Thread zhijiang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-7394:

Description: 
This is a part of work for credit-based network flow control. 

The basic works are:

* Propose the {{BufferListener}} interface for notifying buffer availability 
and buffer destroyed.
* {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage the 
exclusive buffers itself.
* {{RemoteInputChannel}} implements {{BufferListener}} interface to be notified 
repeatedly .
* {{RemoteInputChannel}} maintains and notifies of unannounced credit.
* {{RemoteInputChannel}} maintains current sender backlog to trigger requests 
of floating buffers.


  was:
This is a part of work for credit-based network flow control. 

The basic works are:

* The exclusive buffers per channel are assigned to {{RemoteInputChannel}} 
directly during registering task.
* {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage the 
exclusive buffers itself.
* {{RemoteInputChannel}} implements {{BufferPoolListener}} interface to be 
notified available floating buffers from buffer pool.
* {{RemoteInputChannel}} maintains unannounced credit and current sender 
backlog.



> Implement basic InputChannel for credit-based logic
> ---
>
> Key: FLINK-7394
> URL: https://issues.apache.org/jira/browse/FLINK-7394
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control. 
> The basic works are:
> * Propose the {{BufferListener}} interface for notifying buffer availability 
> and buffer destroyed.
> * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage 
> the exclusive buffers itself.
> * {{RemoteInputChannel}} implements {{BufferListener}} interface to be 
> notified repeatedly .
> * {{RemoteInputChannel}} maintains and notifies of unannounced credit.
> * {{RemoteInputChannel}} maintains current sender backlog to trigger requests 
> of floating buffers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125979#comment-16125979
 ] 

ASF GitHub Bot commented on FLINK-7213:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4353
  
Concerning the suggestion about the `MultiStreamStateHandle` - I am not 
sure that this can always work. Different physical files may have headers, so 
it may be important to recognize them as different chunks of state in the 
general case.


> Introduce state management by OperatorID in TaskManager
> ---
>
> Key: FLINK-7213
> URL: https://issues.apache.org/jira/browse/FLINK-7213
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4353: [FLINK-7213] Introduce state management by OperatorID in ...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4353
  
Concerning the suggestion about the `MultiStreamStateHandle` - I am not 
sure that this can always work. Different physical files may have headers, so 
it may be important to recognize them as different chunks of state in the 
general case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7440) Add eager serializable checks on provided de-/serialization schemas for Kinesis consumer / producer

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125951#comment-16125951
 ] 

ASF GitHub Bot commented on FLINK-7440:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4537#discussion_r13250
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 ---
@@ -1062,4 +1104,28 @@ public boolean isClearCalled() {
 
return fakeRestoredState;
}
+
+   private final class NonSerializableDeserializationSchema implements 
KinesisDeserializationSchema {
--- End diff --

Can you please add a comment explaining why this is not serializable and 
why the following one is serializable? Basically pointing out this class is not 
static.


> Add eager serializable checks on provided de-/serialization schemas for 
> Kinesis consumer / producer
> ---
>
> Key: FLINK-7440
> URL: https://issues.apache.org/jira/browse/FLINK-7440
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.4.0, 1.3.3
>
>
> For better user experience, we should add eager serializable checks on the 
> provided {{KinesisDeserializationSchema}} / {{KinesisSerializationSchema}}, 
> with better error messages pointing out exactly that the serialization schema 
> isn't serializable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4537: [FLINK-7440] [kinesis] Add various eager serializability ...

2017-08-14 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4537
  
@tzulitai LGTM!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4537: [FLINK-7440] [kinesis] Add various eager serializa...

2017-08-14 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4537#discussion_r13250
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 ---
@@ -1062,4 +1104,28 @@ public boolean isClearCalled() {
 
return fakeRestoredState;
}
+
+   private final class NonSerializableDeserializationSchema implements 
KinesisDeserializationSchema {
--- End diff --

Can you please add a comment explaining why this is not serializable and 
why the following one is serializable? Basically pointing out this class is not 
static.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7428) avoid one additional buffer copy when receiving messages

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125913#comment-16125913
 ] 

ASF GitHub Bot commented on FLINK-7428:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4529
  
we also need to double-check this against the previous issue of 
d92e422ec7089376583a8f57043274d236c340a4 which may be solved by the way I am 
using the `LengthFieldBasedFrameDecoder` now compared to back then, or the 
changes that happened since then


> avoid one additional buffer copy when receiving messages
> 
>
> Key: FLINK-7428
> URL: https://issues.apache.org/jira/browse/FLINK-7428
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> By using {{LengthFieldBasedFrameDecoder}}, we create one unnecessary (netty) 
> buffer copy in this class which could be easily avoided since we can ensure 
> that the buffer is free to be released after decoding it in the 
> {{NettyMessageDecoder}} and into our own buffer and/or events.
> The solution would be to make {{NettyMessageDecoder}} extend from 
> {{LengthFieldBasedFrameDecoder}} and handle the decoding of the frames and 
> the objects in there. In the frame creation otherwise done by 
> {{LengthFieldBasedFrameDecoder}}, we could use a sliced buffer instead. This 
> solution also makes the channel pipelines a bit simpler.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4529: [FLINK-7428][network] avoid buffer copies when receiving ...

2017-08-14 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4529
  
we also need to double-check this against the previous issue of 
d92e422ec7089376583a8f57043274d236c340a4 which may be solved by the way I am 
using the `LengthFieldBasedFrameDecoder` now compared to back then, or the 
changes that happened since then


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-6692) The flink-dist jar contains unshaded netty jar

2017-08-14 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125770#comment-16125770
 ] 

Stephan Ewen edited comment on FLINK-6692 at 8/14/17 4:17 PM:
--

I think we cannot avoid the {{org.jboss.netty}} dependency as long as we use 
Akka underneath the hood.

I think we can close this, now that we have all {{io.netty.xxx}} out of the way.

[~wheat9] what do you think?


was (Author: stephanewen):
I think we cannot avoid the {{org.jboss.netty}} dependency as long as we use 
Akka underneath the hood.

I think we can close this, now that we have all {{io.netty.xxx} out of the way.

[~wheat9] what do you think?

> The flink-dist jar contains unshaded netty jar
> --
>
> Key: FLINK-6692
> URL: https://issues.apache.org/jira/browse/FLINK-6692
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Haohui Mai
>Assignee: Haohui Mai
> Fix For: 1.4.0
>
>
> The {{flink-dist}} jar contains unshaded netty 3 and netty 4 classes:
> {noformat}
> io/netty/handler/codec/http/router/
> io/netty/handler/codec/http/router/BadClientSilencer.class
> io/netty/handler/codec/http/router/MethodRouted.class
> io/netty/handler/codec/http/router/Handler.class
> io/netty/handler/codec/http/router/Router.class
> io/netty/handler/codec/http/router/DualMethodRouter.class
> io/netty/handler/codec/http/router/Routed.class
> io/netty/handler/codec/http/router/AbstractHandler.class
> io/netty/handler/codec/http/router/KeepAliveWrite.class
> io/netty/handler/codec/http/router/DualAbstractHandler.class
> io/netty/handler/codec/http/router/MethodRouter.class
> {noformat}
> {noformat}
> org/jboss/netty/util/internal/jzlib/InfBlocks.class
> org/jboss/netty/util/internal/jzlib/InfCodes.class
> org/jboss/netty/util/internal/jzlib/InfTree.class
> org/jboss/netty/util/internal/jzlib/Inflate$1.class
> org/jboss/netty/util/internal/jzlib/Inflate.class
> org/jboss/netty/util/internal/jzlib/JZlib$WrapperType.class
> org/jboss/netty/util/internal/jzlib/JZlib.class
> org/jboss/netty/util/internal/jzlib/StaticTree.class
> org/jboss/netty/util/internal/jzlib/Tree.class
> org/jboss/netty/util/internal/jzlib/ZStream$1.class
> org/jboss/netty/util/internal/jzlib/ZStream.class
> {noformat}
> Is it an expected behavior?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7417) Create flink-shaded-jackson

2017-08-14 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-7417:
---

Assignee: Chesnay Schepler

> Create flink-shaded-jackson
> ---
>
> Key: FLINK-7417
> URL: https://issues.apache.org/jira/browse/FLINK-7417
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>
> The {{com.fasterml:jackson}} library is another culprit of frequent conflicts 
> that we need to shade away.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6982) Replace guava dependencies

2017-08-14 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-6982.
---
Resolution: Fixed

1.4: 8dfb9d00653271ea4adbeb752da8f62d7647b6d8

> Replace guava dependencies
> --
>
> Key: FLINK-6982
> URL: https://issues.apache.org/jira/browse/FLINK-6982
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7399) Add checkstyle rule to forbid codehaus jackson imports

2017-08-14 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7399.
---
Resolution: Fixed

1.4: e30eb13c4a84517ffa502b2c9c7da4b5b8541c9c

> Add checkstyle rule to forbid codehaus jackson imports
> --
>
> Key: FLINK-7399
> URL: https://issues.apache.org/jira/browse/FLINK-7399
> Project: Flink
>  Issue Type: Improvement
>  Components: Checkstyle
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7441) Double quote string literals is not supported in Table API and SQL

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125900#comment-16125900
 ] 

ASF GitHub Bot commented on FLINK-7441:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4538
  
@twalthr @wuchong Please make sure that the commit messages describes what 
was changed, and not the state before the change. We now have a commit that 
adds support for quoted strings, with the commit message saying that this isn't 
supported.


> Double quote string literals is not supported in Table API and SQL
> --
>
> Key: FLINK-7441
> URL: https://issues.apache.org/jira/browse/FLINK-7441
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.2
>Reporter: Jark Wu
>Assignee: Jark Wu
> Fix For: 1.4.0, 1.3.3
>
>
> Code generation doesn't handle double quote string literals and some control 
> characters which leads to compile error.
> {code}
> Caused by: org.codehaus.commons.compiler.CompileException: Line 50, Column 
> 48: Expression "hello" is not an rvalue
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4538: [FLINK-7441] [table] Double quote string literals is not ...

2017-08-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4538
  
@twalthr @wuchong Please make sure that the commit messages describes what 
was changed, and not the state before the change. We now have a commit that 
adds support for quoted strings, with the commit message saying that this isn't 
supported.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7399) Add checkstyle rule to forbid codehaus jackson imports

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125894#comment-16125894
 ] 

ASF GitHub Bot commented on FLINK-7399:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4505


> Add checkstyle rule to forbid codehaus jackson imports
> --
>
> Key: FLINK-7399
> URL: https://issues.apache.org/jira/browse/FLINK-7399
> Project: Flink
>  Issue Type: Improvement
>  Components: Checkstyle
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6982) Replace guava dependencies

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125895#comment-16125895
 ] 

ASF GitHub Bot commented on FLINK-6982:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4503


> Replace guava dependencies
> --
>
> Key: FLINK-6982
> URL: https://issues.apache.org/jira/browse/FLINK-6982
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4505: [FLINK-7399] [checkstyle] Forbid imports from org....

2017-08-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4505


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4503: [FLINK-6982] [guava] Integrate flink-shaded-guava-...

2017-08-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4503


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6692) The flink-dist jar contains unshaded netty jar

2017-08-14 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125856#comment-16125856
 ] 

Haohui Mai commented on FLINK-6692:
---

Sounds good. Thanks for the effort!

> The flink-dist jar contains unshaded netty jar
> --
>
> Key: FLINK-6692
> URL: https://issues.apache.org/jira/browse/FLINK-6692
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Haohui Mai
>Assignee: Haohui Mai
> Fix For: 1.4.0
>
>
> The {{flink-dist}} jar contains unshaded netty 3 and netty 4 classes:
> {noformat}
> io/netty/handler/codec/http/router/
> io/netty/handler/codec/http/router/BadClientSilencer.class
> io/netty/handler/codec/http/router/MethodRouted.class
> io/netty/handler/codec/http/router/Handler.class
> io/netty/handler/codec/http/router/Router.class
> io/netty/handler/codec/http/router/DualMethodRouter.class
> io/netty/handler/codec/http/router/Routed.class
> io/netty/handler/codec/http/router/AbstractHandler.class
> io/netty/handler/codec/http/router/KeepAliveWrite.class
> io/netty/handler/codec/http/router/DualAbstractHandler.class
> io/netty/handler/codec/http/router/MethodRouter.class
> {noformat}
> {noformat}
> org/jboss/netty/util/internal/jzlib/InfBlocks.class
> org/jboss/netty/util/internal/jzlib/InfCodes.class
> org/jboss/netty/util/internal/jzlib/InfTree.class
> org/jboss/netty/util/internal/jzlib/Inflate$1.class
> org/jboss/netty/util/internal/jzlib/Inflate.class
> org/jboss/netty/util/internal/jzlib/JZlib$WrapperType.class
> org/jboss/netty/util/internal/jzlib/JZlib.class
> org/jboss/netty/util/internal/jzlib/StaticTree.class
> org/jboss/netty/util/internal/jzlib/Tree.class
> org/jboss/netty/util/internal/jzlib/ZStream$1.class
> org/jboss/netty/util/internal/jzlib/ZStream.class
> {noformat}
> Is it an expected behavior?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-6692) The flink-dist jar contains unshaded netty jar

2017-08-14 Thread Haohui Mai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haohui Mai resolved FLINK-6692.
---
Resolution: Fixed

> The flink-dist jar contains unshaded netty jar
> --
>
> Key: FLINK-6692
> URL: https://issues.apache.org/jira/browse/FLINK-6692
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Haohui Mai
>Assignee: Haohui Mai
> Fix For: 1.4.0
>
>
> The {{flink-dist}} jar contains unshaded netty 3 and netty 4 classes:
> {noformat}
> io/netty/handler/codec/http/router/
> io/netty/handler/codec/http/router/BadClientSilencer.class
> io/netty/handler/codec/http/router/MethodRouted.class
> io/netty/handler/codec/http/router/Handler.class
> io/netty/handler/codec/http/router/Router.class
> io/netty/handler/codec/http/router/DualMethodRouter.class
> io/netty/handler/codec/http/router/Routed.class
> io/netty/handler/codec/http/router/AbstractHandler.class
> io/netty/handler/codec/http/router/KeepAliveWrite.class
> io/netty/handler/codec/http/router/DualAbstractHandler.class
> io/netty/handler/codec/http/router/MethodRouter.class
> {noformat}
> {noformat}
> org/jboss/netty/util/internal/jzlib/InfBlocks.class
> org/jboss/netty/util/internal/jzlib/InfCodes.class
> org/jboss/netty/util/internal/jzlib/InfTree.class
> org/jboss/netty/util/internal/jzlib/Inflate$1.class
> org/jboss/netty/util/internal/jzlib/Inflate.class
> org/jboss/netty/util/internal/jzlib/JZlib$WrapperType.class
> org/jboss/netty/util/internal/jzlib/JZlib.class
> org/jboss/netty/util/internal/jzlib/StaticTree.class
> org/jboss/netty/util/internal/jzlib/Tree.class
> org/jboss/netty/util/internal/jzlib/ZStream$1.class
> org/jboss/netty/util/internal/jzlib/ZStream.class
> {noformat}
> Is it an expected behavior?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7223) Increase DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS for Flink-kinesis-connector

2017-08-14 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125845#comment-16125845
 ] 

Stephan Ewen commented on FLINK-7223:
-

[~phoenixjiangnan] Do you think it is worthwhile filing an issue at Amazon for 
that?
It seems crazy that the number of describe requests is so low and across all 
apps of one account.


> Increase DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS for Flink-kinesis-connector
> 
>
> Key: FLINK-7223
> URL: https://issues.apache.org/jira/browse/FLINK-7223
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0
>
>
> Background: {{DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS}} in 
> {{org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants}}
>  is the default value for Flink to call Kinesis's {{describeStream()}} API.
> Problem: Right now, its value is 10,000millis (10sec), which is too short. We 
> ran into problems that Flink-kinesis-connector's call of {{describeStream()}} 
> exceeds Kinesis rate limit, and broken Flink taskmanager.
> According to 
> http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html,
>  
> "This operation has a limit of 10 transactions per second per account.". What 
> it means is that the 10transaction/account is a limit on a single 
> organization's AWS account..:(  We contacted AWS Support, and confirmed 
> this. If you have more applications (either other Flink apps or non-Flink 
> apps) competing aggressively with your Flink app on this API, your Flink app 
> breaks. 
> I propose increasing the value DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS from 
> 10,000millis(10sec) to preferably 300,000 (5min). Or at least 60,000 (1min) 
> if anyone has a solid reason arguing that 5min is too long, 
> This is also related to https://issues.apache.org/jira/browse/FLINK-6365



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125817#comment-16125817
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/4532
  
@wuchong @fhueske I hope I addressed all code related issues. Is it ok to 
merge this for now? I will create a follow up issue for the Table to 
DataStream/TableSink conversion case.

> whether we should change the rowtime type when it is an existing field
I think this is a very special case. But it is just a nice addition to make 
the user's life easier. We could also remove the replacing feature as a whole 
to avoid confusion due to the data type conversion. In general, we should get 
rid of `TIMESTAMP` and work on longs as much as possible. In the near future, 
we might also extend the API to use Java 8 `java.time.` equivalents.


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4532: [FLINK-7337] [table] Refactor internal handling of time i...

2017-08-14 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/4532
  
@wuchong @fhueske I hope I addressed all code related issues. Is it ok to 
merge this for now? I will create a follow up issue for the Table to 
DataStream/TableSink conversion case.

> whether we should change the rowtime type when it is an existing field
I think this is a very special case. But it is just a nice addition to make 
the user's life easier. We could also remove the replacing feature as a whole 
to avoid confusion due to the data type conversion. In general, we should get 
rid of `TIMESTAMP` and work on longs as much as possible. In the near future, 
we might also extend the API to use Java 8 `java.time.` equivalents.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125801#comment-16125801
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4488#discussion_r132974743
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 ---
@@ -46,29 +47,29 @@ class StreamTableSourceScan(
 val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
 val fieldTypes = 
TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
 
-val fieldCnt = fieldNames.length
+val fields = fieldNames.zip(fieldTypes)
 
-val rowtime = tableSource match {
+val withRowtime = tableSource match {
   case timeSource: DefinedRowtimeAttribute if 
timeSource.getRowtimeAttribute != null =>
--- End diff --

Can you open an issue for this? We can discuss this after merging this PR.


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

2017-08-14 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4488#discussion_r132974743
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 ---
@@ -46,29 +47,29 @@ class StreamTableSourceScan(
 val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
 val fieldTypes = 
TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
 
-val fieldCnt = fieldNames.length
+val fields = fieldNames.zip(fieldTypes)
 
-val rowtime = tableSource match {
+val withRowtime = tableSource match {
   case timeSource: DefinedRowtimeAttribute if 
timeSource.getRowtimeAttribute != null =>
--- End diff --

Can you open an issue for this? We can discuss this after merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7444) Make external calls non-blocking

2017-08-14 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125791#comment-16125791
 ] 

Stephan Ewen commented on FLINK-7444:
-

Is that possibly  pre-mature optimization?
A fatal error handler blocking the main thread sounds permissible, because the 
process will anyways not continue normally after that.

Also, direct calls have the advantage of succeeding better in fatal cases. An 
OOM that cannot spawn a thread will not be reported if passed through an 
executor.

Especially for the fatal error handler, I would actually suggest to explicitly 
NOT send it through an executor. 

> Make external calls non-blocking
> 
>
> Key: FLINK-7444
> URL: https://issues.apache.org/jira/browse/FLINK-7444
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> All external calls from a {{RpcEndpoint}} can be potentially blocking, e.g. 
> calls to the {{FatalErrorHandler}}. Therefore, I propose to make all these 
> calls coming from the {{RpcEndpoint's}} main thread non-blocking by running 
> them in an {{Executor}}. That way the main thread will never be blocked.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125788#comment-16125788
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132971979
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

@fhueske With left most table I mean the first time indicator in the select 
statement (from  left). I think even join reordering does not change the column 
ordering. I agree that at least `TableSink`s should do deal with it implicitly.


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-14 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132971979
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

@fhueske With left most table I mean the first time indicator in the select 
statement (from  left). I think even join reordering does not change the column 
ordering. I agree that at least `TableSink`s should do deal with it implicitly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7365) excessive warning logs of attempt to override final parameter: fs.s3.buffer.dir

2017-08-14 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125774#comment-16125774
 ] 

Stephan Ewen commented on FLINK-7365:
-

One issue is that Flink instantiated Hadoop's file systems a bit too naively. 
That results in repeated config loading / parsing.

In the course of improving Flink's handling of File Systems and 
handling/loading of Hadoop's file systems, we should make sure that the Hadoop 
File Systems are instantiated only once, which ensures that the conflig is 
parsed once and not per checkpoint (that seems actually a bit too wasteful 
anyways).

> excessive warning logs of attempt to override final parameter: 
> fs.s3.buffer.dir
> ---
>
> Key: FLINK-7365
> URL: https://issues.apache.org/jira/browse/FLINK-7365
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>
> I'm seeing hundreds of line of the following log in my JobManager log file:
> {code:java}
> 2017-08-03 19:48:45,330 WARN  org.apache.hadoop.conf.Configuration
>   - /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to 
> override final parameter: fs.s3.buffer.dir;  Ignoring.
> 2017-08-03 19:48:45,485 WARN  org.apache.hadoop.conf.Configuration
>   - /etc/hadoop/conf/core-site.xml:an attempt to override final 
> parameter: fs.s3.buffer.dir;  Ignoring.
> 2017-08-03 19:48:45,486 WARN  org.apache.hadoop.conf.Configuration
>   - /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to 
> override final parameter: fs.s3.buffer.dir;  Ignoring.
> 2017-08-03 19:48:45,626 WARN  org.apache.hadoop.conf.Configuration
>   - /etc/hadoop/conf/core-site.xml:an attempt to override final 
> parameter: fs.s3.buffer.dir;  Ignoring
> ..
> {code}
> Info of my Flink cluster:
> - Running on EMR with emr-5.6.0
> - Using FSStateBackend, writing checkpointing data files to s3
> - Configured s3 with S3AFileSystem according to 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/aws.html#set-s3-filesystem
> - AWS forbids resetting 'fs.s3.buffer.dir' value (it has a  tag on 
> this property in core-site.xml), so I set 'fs.s3a.buffer.dir' as '/tmp'
> Here's my core-site.xml file:
> {code:java}
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>   fs.s3.buffer.dir
>   /mnt/s3,/mnt1/s3
>   true
> 
> 
>   fs.s3.impl
>   org.apache.hadoop.fs.s3a.S3AFileSystem
> 
> 
>   fs.s3n.impl
>   com.amazon.ws.emr.hadoop.fs.EmrFileSystem
> 
>   
> ipc.client.connect.max.retries.on.timeouts
> 5
>   
>   
> hadoop.security.key.default.bitlength
> 256
>   
>   
> hadoop.proxyuser.hadoop.groups
> *
>   
>   
> hadoop.tmp.dir
> /mnt/var/lib/hadoop/tmp
>   
>   
> hadoop.proxyuser.hadoop.hosts
> *
>   
>   
> io.file.buffer.size
> 65536
>   
>   
> fs.AbstractFileSystem.s3.impl
> org.apache.hadoop.fs.s3.EMRFSDelegate
>   
>   
> fs.s3a.buffer.dir
> /tmp
>   
>   
> fs.s3bfs.impl
> org.apache.hadoop.fs.s3.S3FileSystem
>   
> 
> {code}
> This bug is about excessive logging.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6692) The flink-dist jar contains unshaded netty jar

2017-08-14 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125770#comment-16125770
 ] 

Stephan Ewen commented on FLINK-6692:
-

I think we cannot avoid the {{org.jboss.netty}} dependency as long as we use 
Akka underneath the hood.

I think we can close this, now that we have all {{io.netty.xxx} out of the way.

[~wheat9] what do you think?

> The flink-dist jar contains unshaded netty jar
> --
>
> Key: FLINK-6692
> URL: https://issues.apache.org/jira/browse/FLINK-6692
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Haohui Mai
>Assignee: Haohui Mai
> Fix For: 1.4.0
>
>
> The {{flink-dist}} jar contains unshaded netty 3 and netty 4 classes:
> {noformat}
> io/netty/handler/codec/http/router/
> io/netty/handler/codec/http/router/BadClientSilencer.class
> io/netty/handler/codec/http/router/MethodRouted.class
> io/netty/handler/codec/http/router/Handler.class
> io/netty/handler/codec/http/router/Router.class
> io/netty/handler/codec/http/router/DualMethodRouter.class
> io/netty/handler/codec/http/router/Routed.class
> io/netty/handler/codec/http/router/AbstractHandler.class
> io/netty/handler/codec/http/router/KeepAliveWrite.class
> io/netty/handler/codec/http/router/DualAbstractHandler.class
> io/netty/handler/codec/http/router/MethodRouter.class
> {noformat}
> {noformat}
> org/jboss/netty/util/internal/jzlib/InfBlocks.class
> org/jboss/netty/util/internal/jzlib/InfCodes.class
> org/jboss/netty/util/internal/jzlib/InfTree.class
> org/jboss/netty/util/internal/jzlib/Inflate$1.class
> org/jboss/netty/util/internal/jzlib/Inflate.class
> org/jboss/netty/util/internal/jzlib/JZlib$WrapperType.class
> org/jboss/netty/util/internal/jzlib/JZlib.class
> org/jboss/netty/util/internal/jzlib/StaticTree.class
> org/jboss/netty/util/internal/jzlib/Tree.class
> org/jboss/netty/util/internal/jzlib/ZStream$1.class
> org/jboss/netty/util/internal/jzlib/ZStream.class
> {noformat}
> Is it an expected behavior?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7441) Double quote string literals is not supported in Table API and SQL

2017-08-14 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-7441:

Affects Version/s: 1.3.2

> Double quote string literals is not supported in Table API and SQL
> --
>
> Key: FLINK-7441
> URL: https://issues.apache.org/jira/browse/FLINK-7441
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.2
>Reporter: Jark Wu
>Assignee: Jark Wu
> Fix For: 1.4.0, 1.3.3
>
>
> Code generation doesn't handle double quote string literals and some control 
> characters which leads to compile error.
> {code}
> Caused by: org.codehaus.commons.compiler.CompileException: Line 50, Column 
> 48: Expression "hello" is not an rvalue
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7413) Release Hadoop 2.8.x convenience binaries for 1.3.x

2017-08-14 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125766#comment-16125766
 ] 

Stephan Ewen commented on FLINK-7413:
-

I think we should add these for the 1.4 release.
The concern was only to not introduce this directly before the 1.3 release, 
because new Hadoop versions inevitably introduce new dependency conflicts.

> Release Hadoop 2.8.x convenience binaries for 1.3.x 
> 
>
> Key: FLINK-7413
> URL: https://issues.apache.org/jira/browse/FLINK-7413
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.3
>
>
> At least one user on the mailing lists had an issue because Hadoop 2.8.x 
> binaries are not available: 
> https://lists.apache.org/thread.html/c8badc66778144d9d6c3ee5cb23dd732a66cb6690c6867f47f4bd456@%3Cuser.flink.apache.org%3E
> It should be as easy as adding Hadoop 2.8.x to the list of created binaries 
> in the release files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   >