[GitHub] yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink

2018-11-01 Thread GitBox
yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] 
[kinesis] Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r229956874
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ##
 @@ -73,7 +73,7 @@
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
 
/** The actual Kinesis client from the AWS SDK that we will be using to 
make calls. */
-   private final AmazonKinesis kinesisClient;
+   protected final AmazonKinesis kinesisClient;
 
 Review comment:
   It is used inside `DynamodbStreamsProxy` to [execute the `describeStream` 
call](https://github.com/apache/flink/blob/918eb652a6fa865652b464e742c5f5b6c08c5be6/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/dynamodbstreams/proxy/DynamodbStreamsProxy.java#L197).
  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink

2018-11-01 Thread GitBox
yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] 
[kinesis] Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r229958032
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/dynamodbstreams/model/DynamodbStreamsShardHandleTest.java
 ##
 @@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodbstreams.model;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.streaming.connectors.dynamodbstreams.model.DynamodbStreamsShardHandle.SHARDID_PREFIX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Shard handle unit tests.
+ */
+public class DynamodbStreamsShardHandleTest {
+   @Test
+   public void testIsValidShardId() {
+   // normal form
+   String shardId = "shardId-0001536805703746-69688cb1";
+   assertEquals(true, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // short form
+   shardId = "shardId-0001536805703746";
+   assertEquals(true, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // long form
+   shardId = 
"shardId-0001536805703746-69688cb1aljkwerijfl8228sl12a123akfla";
+   assertEquals(true, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // invalid with wrong prefix
+   shardId = "sId-0001536805703746-69688cb1";
+   assertEquals(false, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // invalid with non-digits
+   shardId = "shardId-000153680570aabb-69688cb1";
+   assertEquals(false, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // invalid with shardId too long
+   shardId = 
"shardId-0001536805703746-69688cb1aljkwerijfl8228sl12a123akfla";
+   assertEquals(false, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+   }
+
+   @Test
+   public void testCompareShardId() {
+   final int numShardIds = 10;
+   final int shardIdDigitLen = 20;
+   final String zeros = "";  // twenty '0' 
chars
+   String shardIdValid = "shardId-0001536805703746-69688cb1";
+   String shardIdInvalid = "shardId-000153680570aabb-69688cb1";
+
+   assertEquals(0, 
DynamodbStreamsShardHandle.compareShardIds(shardIdValid, shardIdValid));
+
+   // comparison of invalid shardIds should yield exception
+   try {
+   
DynamodbStreamsShardHandle.compareShardIds(shardIdValid, shardIdInvalid);
+   fail("invalid shard Id" + shardIdInvalid + " should 
trigger exception");
+   } catch (IllegalArgumentException e) {
+   // ignore
+   }
+   try {
+   
DynamodbStreamsShardHandle.compareShardIds(shardIdInvalid, shardIdValid);
+   fail("invalid shard Id" + shardIdInvalid + " should 
trigger exception");
+   } catch (IllegalArgumentException e) {
+   // ignore
 
 Review comment:
   yes `// expected` clarifies more.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink

2018-11-01 Thread GitBox
yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] 
[kinesis] Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r229956951
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/dynamodbstreams/FlinkDynamodbStreamsConsumer.java
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodbstreams;
 
 Review comment:
   Hmm, both the `dynamodbstreams` and `kinesis` modules are under the parent 
module `org.apache.flink.streaming.connectors`.   This is also consistent with 
the directory structure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671197#comment-16671197
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] 
[kinesis] Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r229958032
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/dynamodbstreams/model/DynamodbStreamsShardHandleTest.java
 ##
 @@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodbstreams.model;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.streaming.connectors.dynamodbstreams.model.DynamodbStreamsShardHandle.SHARDID_PREFIX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Shard handle unit tests.
+ */
+public class DynamodbStreamsShardHandleTest {
+   @Test
+   public void testIsValidShardId() {
+   // normal form
+   String shardId = "shardId-0001536805703746-69688cb1";
+   assertEquals(true, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // short form
+   shardId = "shardId-0001536805703746";
+   assertEquals(true, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // long form
+   shardId = 
"shardId-0001536805703746-69688cb1aljkwerijfl8228sl12a123akfla";
+   assertEquals(true, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // invalid with wrong prefix
+   shardId = "sId-0001536805703746-69688cb1";
+   assertEquals(false, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // invalid with non-digits
+   shardId = "shardId-000153680570aabb-69688cb1";
+   assertEquals(false, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // invalid with shardId too long
+   shardId = 
"shardId-0001536805703746-69688cb1aljkwerijfl8228sl12a123akfla";
+   assertEquals(false, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+   }
+
+   @Test
+   public void testCompareShardId() {
+   final int numShardIds = 10;
+   final int shardIdDigitLen = 20;
+   final String zeros = "";  // twenty '0' 
chars
+   String shardIdValid = "shardId-0001536805703746-69688cb1";
+   String shardIdInvalid = "shardId-000153680570aabb-69688cb1";
+
+   assertEquals(0, 
DynamodbStreamsShardHandle.compareShardIds(shardIdValid, shardIdValid));
+
+   // comparison of invalid shardIds should yield exception
+   try {
+   
DynamodbStreamsShardHandle.compareShardIds(shardIdValid, shardIdInvalid);
+   fail("invalid shard Id" + shardIdInvalid + " should 
trigger exception");
+   } catch (IllegalArgumentException e) {
+   // ignore
+   }
+   try {
+   
DynamodbStreamsShardHandle.compareShardIds(shardIdInvalid, shardIdValid);
+   fail("invalid shard Id" + shardIdInvalid + " should 
trigger exception");
+   } catch (IllegalArgumentException e) {
+   // ignore
 
 Review comment:
   yes `// expected` clarifies more.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://i

[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671195#comment-16671195
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] 
[kinesis] Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r229956874
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ##
 @@ -73,7 +73,7 @@
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
 
/** The actual Kinesis client from the AWS SDK that we will be using to 
make calls. */
-   private final AmazonKinesis kinesisClient;
+   protected final AmazonKinesis kinesisClient;
 
 Review comment:
   It is used inside `DynamodbStreamsProxy` to [execute the `describeStream` 
call](https://github.com/apache/flink/blob/918eb652a6fa865652b464e742c5f5b6c08c5be6/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/dynamodbstreams/proxy/DynamodbStreamsProxy.java#L197).
  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671196#comment-16671196
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] 
[kinesis] Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r229956951
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/dynamodbstreams/FlinkDynamodbStreamsConsumer.java
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodbstreams;
 
 Review comment:
   Hmm, both the `dynamodbstreams` and `kinesis` modules are under the parent 
module `org.apache.flink.streaming.connectors`.   This is also consistent with 
the directory structure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10231) Add a view SQL DDL

2018-11-01 Thread Shuyi Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671204#comment-16671204
 ] 

Shuyi Chen commented on FLINK-10231:


I somewhat agree that registerTable to be replaced with registerView, but 
registerTableInternal is used for both registering virtual tables and real 
tables, so I think it can stay. What do you think, [~fhueske]?

Also, [~winipanda], the community is finalizing DDL design doc, and will share 
soon. You are welcome to comment on it :)

> Add a view SQL DDL
> --
>
> Key: FLINK-10231
> URL: https://issues.apache.org/jira/browse/FLINK-10231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: winifredtang
>Priority: Major
>
> FLINK-10163 added initial view support for the SQL Client. However, for 
> supporting the [full definition of 
> views|https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/AlterView]
>  (with schema, comments, etc.) we need to support native support for views in 
> the Table API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yxu-valleytider commented on issue #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink

2018-11-01 Thread GitBox
yxu-valleytider commented on issue #6968: [FLINK-4582] [kinesis] Consuming data 
from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#issuecomment-434959900
 
 
   PTAL @tzulitai 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671214#comment-16671214
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

yxu-valleytider commented on issue #6968: [FLINK-4582] [kinesis] Consuming data 
from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#issuecomment-434959900
 
 
   PTAL @tzulitai 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-01 Thread Ying Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671216#comment-16671216
 ] 

Ying Xu commented on FLINK-4582:


[~tinder-dthomson] Posted the PR here:  
https://github.com/apache/flink/pull/6968

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kl0u closed pull request #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink

2018-11-01 Thread GitBox
kl0u closed pull request #6520: [FLINK-10097][DataStream API] Additional tests 
for StreamingFileSink
URL: https://github.com/apache/flink/pull/6520
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 6187e6853dd..65a7628578c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.util.Preconditions;
@@ -26,6 +27,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -59,10 +62,11 @@
 
private final RollingPolicy rollingPolicy;
 
-   private final Map> 
pendingPartsPerCheckpoint = new HashMap<>();
+   private final Map> 
pendingPartsPerCheckpoint;
 
private long partCounter;
 
+   @Nullable
private PartFileWriter inProgressPart;
 
private List 
pendingPartsForCurrentCheckpoint;
@@ -88,6 +92,7 @@ private Bucket(
this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
 
this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
+   this.pendingPartsPerCheckpoint = new HashMap<>();
}
 
/**
@@ -277,6 +282,24 @@ void onProcessingTime(long timestamp) throws IOException {
}
}
 
+   // --- Testing Methods 
-
+
+   @VisibleForTesting
+   Map> 
getPendingPartsPerCheckpoint() {
+   return pendingPartsPerCheckpoint;
+   }
+
+   @Nullable
+   @VisibleForTesting
+   PartFileWriter getInProgressPart() {
+   return inProgressPart;
+   }
+
+   @VisibleForTesting
+   List 
getPendingPartsForCurrentCheckpoint() {
+   return pendingPartsForCurrentCheckpoint;
+   }
+
// --- Static Factory Methods 
-
 
/**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 2aca841f16d..d08bc2ac0c3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -186,6 +187,10 @@ private void handleRestoredBucketState(final 
BucketState recoveredStat
}
 
private void updateActiveBucketId(final BucketID bucketId, final 
Bucket restoredBucket) throws IOException {
+   if (!restoredBucket.isActive()) {
+   return;
+   }
+
final Bucket bucket = activeBuckets.get(bucketId);
if (bucket != null) {
bucket.merge(restoredBucket);
@@ -224,6 +229,9 @@ void snapshotState(
LOG.info("Subtask {} checkpointing for checkpoint with id={} 
(max part counter={}).",
subtaskIndex, checkpointId, maxPartCounter);
 
+   bucketStatesContainer.clear();
+   partCounterStateContainer.clear();
+
snapshotActiveBuckets(checkpointId, bucketStatesContainer);
partCounterStateContainer.add(maxPartCounter);
}
@@ -341,4 +349,16 @@ public Long timestamp() {
return elementTimestamp;
}
}
+
+   // --- Testing Methods 
-
+
+   @VisibleForTesting
+   public long getMaxPartCounter() {
+   return maxPartCounter;
+   }
+
+   @VisibleForTesting
+   Map> getActiveBuckets() {
+   retu

[jira] [Commented] (FLINK-10097) More tests to increase StreamingFileSink test coverage

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671233#comment-16671233
 ] 

ASF GitHub Bot commented on FLINK-10097:


kl0u closed pull request #6520: [FLINK-10097][DataStream API] Additional tests 
for StreamingFileSink
URL: https://github.com/apache/flink/pull/6520
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 6187e6853dd..65a7628578c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.util.Preconditions;
@@ -26,6 +27,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -59,10 +62,11 @@
 
private final RollingPolicy rollingPolicy;
 
-   private final Map> 
pendingPartsPerCheckpoint = new HashMap<>();
+   private final Map> 
pendingPartsPerCheckpoint;
 
private long partCounter;
 
+   @Nullable
private PartFileWriter inProgressPart;
 
private List 
pendingPartsForCurrentCheckpoint;
@@ -88,6 +92,7 @@ private Bucket(
this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
 
this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
+   this.pendingPartsPerCheckpoint = new HashMap<>();
}
 
/**
@@ -277,6 +282,24 @@ void onProcessingTime(long timestamp) throws IOException {
}
}
 
+   // --- Testing Methods 
-
+
+   @VisibleForTesting
+   Map> 
getPendingPartsPerCheckpoint() {
+   return pendingPartsPerCheckpoint;
+   }
+
+   @Nullable
+   @VisibleForTesting
+   PartFileWriter getInProgressPart() {
+   return inProgressPart;
+   }
+
+   @VisibleForTesting
+   List 
getPendingPartsForCurrentCheckpoint() {
+   return pendingPartsForCurrentCheckpoint;
+   }
+
// --- Static Factory Methods 
-
 
/**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 2aca841f16d..d08bc2ac0c3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -186,6 +187,10 @@ private void handleRestoredBucketState(final 
BucketState recoveredStat
}
 
private void updateActiveBucketId(final BucketID bucketId, final 
Bucket restoredBucket) throws IOException {
+   if (!restoredBucket.isActive()) {
+   return;
+   }
+
final Bucket bucket = activeBuckets.get(bucketId);
if (bucket != null) {
bucket.merge(restoredBucket);
@@ -224,6 +229,9 @@ void snapshotState(
LOG.info("Subtask {} checkpointing for checkpoint with id={} 
(max part counter={}).",
subtaskIndex, checkpointId, maxPartCounter);
 
+   bucketStatesContainer.clear();
+   partCounterStateContainer.clear();
+
snapshotActiveBuckets(checkpointId, bucketStatesContainer);
partCounterStateContainer.add(maxPartCounter);
}
@@ -341,4 +349,16 @@ public Long timestamp() {
return elementTimestamp;
}
}
+
+   // --

[jira] [Updated] (FLINK-8625) Move OutputFlusher thread to Netty scheduled executor

2018-11-01 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-8625:
--
Description: 
This will allow us to trigger/schedule next flush only if we are not currently 
busy. 

PR: https://github.com/apache/flink/pull/6698

  was:This will allow us to trigger/schedule next flush only if we are not 
currently busy. 


> Move OutputFlusher thread to Netty scheduled executor
> -
>
> Key: FLINK-8625
> URL: https://issues.apache.org/jira/browse/FLINK-8625
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>
> This will allow us to trigger/schedule next flush only if we are not 
> currently busy. 
> PR: https://github.com/apache/flink/pull/6698



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10638) Invalid table scan resolution for temporal join queries

2018-11-01 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-10638:
--
Fix Version/s: 1.7.0

> Invalid table scan resolution for temporal join queries
> ---
>
> Key: FLINK-10638
> URL: https://issues.apache.org/jira/browse/FLINK-10638
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Registered tables that contain a temporal join are not properly resolved when 
> performing a table scan.
> A planning error occurs when registering a table with a temporal join and 
> reading from it again.
> {code}
> LogicalProject(amount=[*($0, $4)])
>   LogicalFilter(condition=[=($3, $1)])
> LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
> requiredColumns=[{2}])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
>   
> LogicalTableFunctionScan(invocation=[Rates(CAST($cor0.rowtime):TIMESTAMP(3) 
> NOT NULL)], rowType=[RecordType(VARCHAR(65536) currency, BIGINT rate, TIME 
> ATTRIBUTE(ROWTIME) rowtime)], elementType=[class [Ljava.lang.Object;])
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10693) Fix Scala EitherSerializer duplication

2018-11-01 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-10693.
---
   Resolution: Fixed
Fix Version/s: 1.6.3
   1.5.6

Fixed via
1.7.0: 68df7f969c898bbc7ca348e917fd5fc40a123faf
1.6.3: 5aa07cd277bad087bbcf901afaac6133c87b6812
1.5.6: 26fa2e5f75f25ab5723fe79ce1a938ba94a6337b

> Fix Scala EitherSerializer duplication
> --
>
> Key: FLINK-10693
> URL: https://issues.apache.org/jira/browse/FLINK-10693
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Affects Versions: 1.6.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> The Scala Either Serializer has buggy duplication logic, resulting in sharing 
> and incorrect concurrent use when the nested serializers are not thread safe.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #6951: [FLINK-10693] [Scala API] Fix incorrect duplication in EitherSerializer

2018-11-01 Thread GitBox
asfgit closed pull request #6951: [FLINK-10693] [Scala API] Fix incorrect 
duplication in EitherSerializer
URL: https://github.com/apache/flink/pull/6951
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java
index 9566a663169..8ad4e09a1e2 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java
@@ -27,8 +27,8 @@
  * Configuration snapshot for serializers of Scala's {@link Either} type,
  * containing configuration snapshots of the Left and Right serializers.
  */
-public class ScalaEitherSerializerConfigSnapshot, L, R>
-   extends CompositeTypeSerializerConfigSnapshot {
+public class ScalaEitherSerializerConfigSnapshot
+   extends CompositeTypeSerializerConfigSnapshot> {
 
private static final int VERSION = 1;
 
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 82637befdcd..14f2196b9cc 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -27,15 +27,24 @@ import org.apache.flink.core.memory.{DataInputView, 
DataOutputView}
  */
 @Internal
 @SerialVersionUID(9219995873023657525L)
-class EitherSerializer[A, B, T <: Either[A, B]](
+class EitherSerializer[A, B](
 val leftSerializer: TypeSerializer[A],
 val rightSerializer: TypeSerializer[B])
-  extends TypeSerializer[T] {
+  extends TypeSerializer[Either[A, B]] {
 
-  override def duplicate: EitherSerializer[A,B,T] = this
+  override def duplicate: EitherSerializer[A,B] = {
+val leftDup = leftSerializer.duplicate()
+val rightDup = rightSerializer.duplicate()
 
-  override def createInstance: T = {
-Left(null).asInstanceOf[T]
+if (leftDup.eq(leftSerializer) && rightDup.eq(rightSerializer)) {
+  this
+} else {
+  new EitherSerializer[A, B](leftDup, rightDup)
+}
+  }
+
+  override def createInstance: Either[A, B] = {
+Left(null).asInstanceOf[Left[A, B]]
   }
 
   override def isImmutableType: Boolean = {
@@ -45,12 +54,12 @@ class EitherSerializer[A, B, T <: Either[A, B]](
 
   override def getLength: Int = -1
 
-  override def copy(from: T): T = from match {
-case Left(a) => Left(leftSerializer.copy(a)).asInstanceOf[T]
-case Right(b) => Right(rightSerializer.copy(b)).asInstanceOf[T]
+  override def copy(from: Either[A, B]): Either[A, B] = from match {
+case Left(a) => Left(leftSerializer.copy(a))
+case Right(b) => Right(rightSerializer.copy(b))
   }
 
-  override def copy(from: T, reuse: T): T = copy(from)
+  override def copy(from: Either[A, B], reuse: Either[A, B]): Either[A, B] = 
copy(from)
 
   override def copy(source: DataInputView, target: DataOutputView): Unit = {
 val isLeft = source.readBoolean()
@@ -62,7 +71,7 @@ class EitherSerializer[A, B, T <: Either[A, B]](
 }
   }
 
-  override def serialize(either: T, target: DataOutputView): Unit = either 
match {
+  override def serialize(either: Either[A, B], target: DataOutputView): Unit = 
either match {
 case Left(a) =>
   target.writeBoolean(true)
   leftSerializer.serialize(a, target)
@@ -71,27 +80,27 @@ class EitherSerializer[A, B, T <: Either[A, B]](
   rightSerializer.serialize(b, target)
   }
 
-  override def deserialize(source: DataInputView): T = {
+  override def deserialize(source: DataInputView): Either[A, B] = {
 val isLeft = source.readBoolean()
 if (isLeft) {
-  Left(leftSerializer.deserialize(source)).asInstanceOf[T]
+  Left(leftSerializer.deserialize(source))
 } else {
-  Right(rightSerializer.deserialize(source)).asInstanceOf[T]
+  Right(rightSerializer.deserialize(source))
 }
   }
 
-  override def deserialize(reuse: T, source: DataInputView): T = {
+  override def deserialize(reuse: Either[A, B], source: DataInputView): 
Either[A, B] = {
 val isLeft = source.readBoolean()
 if (isLeft) {
-  Left(leftSerializer.deserialize(source)).asInstanceOf[T]
+  Left(leftSerializer.deserialize(source))
 } else {
-  Right(rightSerializer.deserialize(source)).asInstanceOf[T]
+  Right(rightSerializer.deserialize(source))
 }
   }
 
   override def equals(obj: Any): Boolean = {
 obj match {
-  case eitherSerializer: EitherSer

[jira] [Commented] (FLINK-10693) Fix Scala EitherSerializer duplication

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671272#comment-16671272
 ] 

ASF GitHub Bot commented on FLINK-10693:


asfgit closed pull request #6951: [FLINK-10693] [Scala API] Fix incorrect 
duplication in EitherSerializer
URL: https://github.com/apache/flink/pull/6951
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java
index 9566a663169..8ad4e09a1e2 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java
@@ -27,8 +27,8 @@
  * Configuration snapshot for serializers of Scala's {@link Either} type,
  * containing configuration snapshots of the Left and Right serializers.
  */
-public class ScalaEitherSerializerConfigSnapshot, L, R>
-   extends CompositeTypeSerializerConfigSnapshot {
+public class ScalaEitherSerializerConfigSnapshot
+   extends CompositeTypeSerializerConfigSnapshot> {
 
private static final int VERSION = 1;
 
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 82637befdcd..14f2196b9cc 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -27,15 +27,24 @@ import org.apache.flink.core.memory.{DataInputView, 
DataOutputView}
  */
 @Internal
 @SerialVersionUID(9219995873023657525L)
-class EitherSerializer[A, B, T <: Either[A, B]](
+class EitherSerializer[A, B](
 val leftSerializer: TypeSerializer[A],
 val rightSerializer: TypeSerializer[B])
-  extends TypeSerializer[T] {
+  extends TypeSerializer[Either[A, B]] {
 
-  override def duplicate: EitherSerializer[A,B,T] = this
+  override def duplicate: EitherSerializer[A,B] = {
+val leftDup = leftSerializer.duplicate()
+val rightDup = rightSerializer.duplicate()
 
-  override def createInstance: T = {
-Left(null).asInstanceOf[T]
+if (leftDup.eq(leftSerializer) && rightDup.eq(rightSerializer)) {
+  this
+} else {
+  new EitherSerializer[A, B](leftDup, rightDup)
+}
+  }
+
+  override def createInstance: Either[A, B] = {
+Left(null).asInstanceOf[Left[A, B]]
   }
 
   override def isImmutableType: Boolean = {
@@ -45,12 +54,12 @@ class EitherSerializer[A, B, T <: Either[A, B]](
 
   override def getLength: Int = -1
 
-  override def copy(from: T): T = from match {
-case Left(a) => Left(leftSerializer.copy(a)).asInstanceOf[T]
-case Right(b) => Right(rightSerializer.copy(b)).asInstanceOf[T]
+  override def copy(from: Either[A, B]): Either[A, B] = from match {
+case Left(a) => Left(leftSerializer.copy(a))
+case Right(b) => Right(rightSerializer.copy(b))
   }
 
-  override def copy(from: T, reuse: T): T = copy(from)
+  override def copy(from: Either[A, B], reuse: Either[A, B]): Either[A, B] = 
copy(from)
 
   override def copy(source: DataInputView, target: DataOutputView): Unit = {
 val isLeft = source.readBoolean()
@@ -62,7 +71,7 @@ class EitherSerializer[A, B, T <: Either[A, B]](
 }
   }
 
-  override def serialize(either: T, target: DataOutputView): Unit = either 
match {
+  override def serialize(either: Either[A, B], target: DataOutputView): Unit = 
either match {
 case Left(a) =>
   target.writeBoolean(true)
   leftSerializer.serialize(a, target)
@@ -71,27 +80,27 @@ class EitherSerializer[A, B, T <: Either[A, B]](
   rightSerializer.serialize(b, target)
   }
 
-  override def deserialize(source: DataInputView): T = {
+  override def deserialize(source: DataInputView): Either[A, B] = {
 val isLeft = source.readBoolean()
 if (isLeft) {
-  Left(leftSerializer.deserialize(source)).asInstanceOf[T]
+  Left(leftSerializer.deserialize(source))
 } else {
-  Right(rightSerializer.deserialize(source)).asInstanceOf[T]
+  Right(rightSerializer.deserialize(source))
 }
   }
 
-  override def deserialize(reuse: T, source: DataInputView): T = {
+  override def deserialize(reuse: Either[A, B], source: DataInputView): 
Either[A, B] = {
 val isLeft = source.readBoolean()
 if (isLeft) {
-  Left(leftSerializer.deserialize(source)).asInstanceOf[T]
+  Left(leftSerializer.deseria

[jira] [Commented] (FLINK-10731) Support AVG on Date fields

2018-11-01 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671273#comment-16671273
 ] 

Timo Walther commented on FLINK-10731:
--

[~f.pompermaier] so can we close this issue or rename it to a {{MEDIAN}} 
function?

> Support AVG on Date fields
> --
>
> Key: FLINK-10731
> URL: https://issues.apache.org/jira/browse/FLINK-10731
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.6.2
>Reporter: Flavio Pompermaier
>Priority: Minor
>
> AVG function does not work on date fields right now



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10737) FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint failed on Travis

2018-11-01 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-10737:
--
Priority: Blocker  (was: Critical)

> FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint failed on Travis
> 
>
> Key: FLINK-10737
> URL: https://issues.apache.org/jira/browse/FLINK-10737
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.7.0
>
>
> The {{FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint}} failed on 
> Travis:
> https://api.travis-ci.org/v3/job/448781612/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10738) FlinkKafkaProducerITCase.testScaleUpAfterScalingDown failed on Travis

2018-11-01 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-10738:
--
Priority: Blocker  (was: Critical)

> FlinkKafkaProducerITCase.testScaleUpAfterScalingDown failed on Travis
> -
>
> Key: FLINK-10738
> URL: https://issues.apache.org/jira/browse/FLINK-10738
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.7.0
>
>
> The {{FlinkKafkaProducerITCase.testScaleUpAfterScalingDown}} failed on 
> Travis: https://api.travis-ci.org/v3/job/448781612/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] Enable mutual REST SSL auth in e2e tests

2018-11-01 Thread GitBox
zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229973793
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_ssl.sh
 ##
 @@ -0,0 +1,98 @@
+#!/usr/bin/env bash
+
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+function _set_conf_ssl_helper {
+local type=$1 # 'internal' or external 'rest'
+local ssl_dir="${TEST_DATA_DIR}/ssl/${type}"
+local password="${type}.password"
+
+if [ "${type}" != "internal" ] && [ "${type}" != "rest" ]; then
+echo "Unknown type of ssl connectivity: ${type}. It can be either 
'internal' or external 'rest'"
+exit 1
+fi
+
+# clean up the dir that will be used for SSL certificates and trust stores
+if [ -e "${ssl_dir}" ]; then
+   echo "File ${ssl_dir} exists. Deleting it..."
+   rm -rf "${ssl_dir}"
+fi
+mkdir -p "${ssl_dir}"
+
+SANSTRING="dns:${NODENAME}"
+for NODEIP in $(get_node_ip) ; do
+SANSTRING="${SANSTRING},ip:${NODEIP}"
+done
+
+echo "Using SAN ${SANSTRING}"
+
+# create certificates
+keytool -genkeypair -alias ca -keystore "${ssl_dir}/ca.keystore" -dname 
"CN=Sample CA" -storepass ${password} -keypass ${password} -keyalg RSA -ext 
bc=ca:true -storetype PKCS12
+keytool -keystore "${ssl_dir}/ca.keystore" -storepass ${password} -alias 
ca -exportcert > "${ssl_dir}/ca.cer"
+keytool -importcert -keystore "${ssl_dir}/ca.truststore" -alias ca 
-storepass ${password} -noprompt -file "${ssl_dir}/ca.cer"
+
+keytool -genkeypair -alias node -keystore "${ssl_dir}/node.keystore" 
-dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass ${password} -keypass 
${password} -keyalg RSA -storetype PKCS12
+keytool -certreq -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -alias node -file "${ssl_dir}/node.csr"
+keytool -gencert -keystore "${ssl_dir}/ca.keystore" -storepass ${password} 
-alias ca -ext SAN=${SANSTRING} -infile "${ssl_dir}/node.csr" -outfile 
"${ssl_dir}/node.cer"
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/ca.cer" -alias ca -noprompt
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/node.cer" -alias node -noprompt
+
+openssl pkcs12 -passin pass:${password} -in "${ssl_dir}/node.keystore" 
-passout pass: -out "${ssl_dir}/node.pem" -nodes
+
+# adapt config
+# (here we rely on security.ssl.enabled enabling SSL for all components 
and internal as well as
+# external communication channels)
+set_conf security.ssl.${type}.enabled true
+set_conf security.ssl.${type}.keystore ${ssl_dir}/node.keystore
+set_conf security.ssl.${type}.keystore-password ${password}
+set_conf security.ssl.${type}.key-password ${password}
+set_conf security.ssl.${type}.truststore ${ssl_dir}/ca.truststore
+set_conf security.ssl.${type}.truststore-password ${password}
+}
+
+function set_conf_internal_ssl {
+_set_conf_ssl_helper "internal"
+}
+
+function set_conf_mutual_rest_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
+local mutual="false"
+local ssl_dir="${TEST_DATA_DIR}/ssl/rest"
+if [ "${auth}" == "mutual" ]; then
+CURL_SSL_ARGS="${CURL_SSL_ARGS} --cert ${ssl_dir}/node.pem"
+mutual="true";
+fi
+echo "Mutual ssl auth: ${mutual}"
+set_conf security.ssl.rest.authentication-enabled ${mutual}
+}
+
+function set_conf_rest_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
+local ssl_dir="${TEST_DATA_DIR}/ssl/rest"
+_set_conf_ssl_helper "rest"
+set_conf_mutual_rest_ssl ${auth}
+REST_PROTOCOL="https"
+CURL_SSL_ARGS="${CURL_SSL_ARGS} --cacert ${ssl_dir}/node.pem"
 
 Review comment:
   oh boy I'm not so sure about this one; if the SSL logic in `common.sh` and 
`common_ssl.sh` is coupled like I'd rather have them in the same file. I 
believe so far we managed to keep the `*common*` relatively independe

[GitHub] zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] Enable mutual REST SSL auth in e2e tests

2018-11-01 Thread GitBox
zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229971488
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_ssl.sh
 ##
 @@ -0,0 +1,98 @@
+#!/usr/bin/env bash
+
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+function _set_conf_ssl_helper {
+local type=$1 # 'internal' or external 'rest'
+local ssl_dir="${TEST_DATA_DIR}/ssl/${type}"
+local password="${type}.password"
+
+if [ "${type}" != "internal" ] && [ "${type}" != "rest" ]; then
+echo "Unknown type of ssl connectivity: ${type}. It can be either 
'internal' or external 'rest'"
+exit 1
+fi
+
+# clean up the dir that will be used for SSL certificates and trust stores
+if [ -e "${ssl_dir}" ]; then
+   echo "File ${ssl_dir} exists. Deleting it..."
+   rm -rf "${ssl_dir}"
+fi
+mkdir -p "${ssl_dir}"
+
+SANSTRING="dns:${NODENAME}"
+for NODEIP in $(get_node_ip) ; do
+SANSTRING="${SANSTRING},ip:${NODEIP}"
+done
+
+echo "Using SAN ${SANSTRING}"
+
+# create certificates
+keytool -genkeypair -alias ca -keystore "${ssl_dir}/ca.keystore" -dname 
"CN=Sample CA" -storepass ${password} -keypass ${password} -keyalg RSA -ext 
bc=ca:true -storetype PKCS12
+keytool -keystore "${ssl_dir}/ca.keystore" -storepass ${password} -alias 
ca -exportcert > "${ssl_dir}/ca.cer"
+keytool -importcert -keystore "${ssl_dir}/ca.truststore" -alias ca 
-storepass ${password} -noprompt -file "${ssl_dir}/ca.cer"
+
+keytool -genkeypair -alias node -keystore "${ssl_dir}/node.keystore" 
-dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass ${password} -keypass 
${password} -keyalg RSA -storetype PKCS12
+keytool -certreq -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -alias node -file "${ssl_dir}/node.csr"
+keytool -gencert -keystore "${ssl_dir}/ca.keystore" -storepass ${password} 
-alias ca -ext SAN=${SANSTRING} -infile "${ssl_dir}/node.csr" -outfile 
"${ssl_dir}/node.cer"
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/ca.cer" -alias ca -noprompt
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/node.cer" -alias node -noprompt
+
+openssl pkcs12 -passin pass:${password} -in "${ssl_dir}/node.keystore" 
-passout pass: -out "${ssl_dir}/node.pem" -nodes
+
+# adapt config
+# (here we rely on security.ssl.enabled enabling SSL for all components 
and internal as well as
+# external communication channels)
+set_conf security.ssl.${type}.enabled true
+set_conf security.ssl.${type}.keystore ${ssl_dir}/node.keystore
+set_conf security.ssl.${type}.keystore-password ${password}
+set_conf security.ssl.${type}.key-password ${password}
+set_conf security.ssl.${type}.truststore ${ssl_dir}/ca.truststore
+set_conf security.ssl.${type}.truststore-password ${password}
+}
+
+function set_conf_internal_ssl {
 
 Review comment:
   function seems unnecessary


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] Enable mutual REST SSL auth in e2e tests

2018-11-01 Thread GitBox
zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229971085
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_ssl.sh
 ##
 @@ -0,0 +1,98 @@
+#!/usr/bin/env bash
+
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+function _set_conf_ssl_helper {
+local type=$1 # 'internal' or external 'rest'
+local ssl_dir="${TEST_DATA_DIR}/ssl/${type}"
+local password="${type}.password"
+
+if [ "${type}" != "internal" ] && [ "${type}" != "rest" ]; then
+echo "Unknown type of ssl connectivity: ${type}. It can be either 
'internal' or external 'rest'"
+exit 1
+fi
+
+# clean up the dir that will be used for SSL certificates and trust stores
+if [ -e "${ssl_dir}" ]; then
+   echo "File ${ssl_dir} exists. Deleting it..."
+   rm -rf "${ssl_dir}"
+fi
+mkdir -p "${ssl_dir}"
+
+SANSTRING="dns:${NODENAME}"
+for NODEIP in $(get_node_ip) ; do
+SANSTRING="${SANSTRING},ip:${NODEIP}"
+done
+
+echo "Using SAN ${SANSTRING}"
+
+# create certificates
+keytool -genkeypair -alias ca -keystore "${ssl_dir}/ca.keystore" -dname 
"CN=Sample CA" -storepass ${password} -keypass ${password} -keyalg RSA -ext 
bc=ca:true -storetype PKCS12
+keytool -keystore "${ssl_dir}/ca.keystore" -storepass ${password} -alias 
ca -exportcert > "${ssl_dir}/ca.cer"
+keytool -importcert -keystore "${ssl_dir}/ca.truststore" -alias ca 
-storepass ${password} -noprompt -file "${ssl_dir}/ca.cer"
+
+keytool -genkeypair -alias node -keystore "${ssl_dir}/node.keystore" 
-dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass ${password} -keypass 
${password} -keyalg RSA -storetype PKCS12
+keytool -certreq -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -alias node -file "${ssl_dir}/node.csr"
+keytool -gencert -keystore "${ssl_dir}/ca.keystore" -storepass ${password} 
-alias ca -ext SAN=${SANSTRING} -infile "${ssl_dir}/node.csr" -outfile 
"${ssl_dir}/node.cer"
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/ca.cer" -alias ca -noprompt
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/node.cer" -alias node -noprompt
+
+openssl pkcs12 -passin pass:${password} -in "${ssl_dir}/node.keystore" 
-passout pass: -out "${ssl_dir}/node.pem" -nodes
 
 Review comment:
   where does this do and where does it come from? It doesn't documented in the 
[SSL 
docs](https://ci.apache.org/projects/flink/flink-docs-master/ops/security-ssl.html).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] Enable mutual REST SSL auth in e2e tests

2018-11-01 Thread GitBox
zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229972405
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_ssl.sh
 ##
 @@ -0,0 +1,98 @@
+#!/usr/bin/env bash
+
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+function _set_conf_ssl_helper {
+local type=$1 # 'internal' or external 'rest'
+local ssl_dir="${TEST_DATA_DIR}/ssl/${type}"
+local password="${type}.password"
+
+if [ "${type}" != "internal" ] && [ "${type}" != "rest" ]; then
+echo "Unknown type of ssl connectivity: ${type}. It can be either 
'internal' or external 'rest'"
+exit 1
+fi
+
+# clean up the dir that will be used for SSL certificates and trust stores
+if [ -e "${ssl_dir}" ]; then
+   echo "File ${ssl_dir} exists. Deleting it..."
+   rm -rf "${ssl_dir}"
+fi
+mkdir -p "${ssl_dir}"
+
+SANSTRING="dns:${NODENAME}"
+for NODEIP in $(get_node_ip) ; do
+SANSTRING="${SANSTRING},ip:${NODEIP}"
+done
+
+echo "Using SAN ${SANSTRING}"
+
+# create certificates
+keytool -genkeypair -alias ca -keystore "${ssl_dir}/ca.keystore" -dname 
"CN=Sample CA" -storepass ${password} -keypass ${password} -keyalg RSA -ext 
bc=ca:true -storetype PKCS12
+keytool -keystore "${ssl_dir}/ca.keystore" -storepass ${password} -alias 
ca -exportcert > "${ssl_dir}/ca.cer"
+keytool -importcert -keystore "${ssl_dir}/ca.truststore" -alias ca 
-storepass ${password} -noprompt -file "${ssl_dir}/ca.cer"
+
+keytool -genkeypair -alias node -keystore "${ssl_dir}/node.keystore" 
-dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass ${password} -keypass 
${password} -keyalg RSA -storetype PKCS12
+keytool -certreq -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -alias node -file "${ssl_dir}/node.csr"
+keytool -gencert -keystore "${ssl_dir}/ca.keystore" -storepass ${password} 
-alias ca -ext SAN=${SANSTRING} -infile "${ssl_dir}/node.csr" -outfile 
"${ssl_dir}/node.cer"
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/ca.cer" -alias ca -noprompt
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/node.cer" -alias node -noprompt
+
+openssl pkcs12 -passin pass:${password} -in "${ssl_dir}/node.keystore" 
-passout pass: -out "${ssl_dir}/node.pem" -nodes
+
+# adapt config
+# (here we rely on security.ssl.enabled enabling SSL for all components 
and internal as well as
+# external communication channels)
+set_conf security.ssl.${type}.enabled true
+set_conf security.ssl.${type}.keystore ${ssl_dir}/node.keystore
+set_conf security.ssl.${type}.keystore-password ${password}
+set_conf security.ssl.${type}.key-password ${password}
+set_conf security.ssl.${type}.truststore ${ssl_dir}/ca.truststore
+set_conf security.ssl.${type}.truststore-password ${password}
+}
+
+function set_conf_internal_ssl {
+_set_conf_ssl_helper "internal"
+}
+
+function set_conf_mutual_rest_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
+local mutual="false"
+local ssl_dir="${TEST_DATA_DIR}/ssl/rest"
+if [ "${auth}" == "mutual" ]; then
+CURL_SSL_ARGS="${CURL_SSL_ARGS} --cert ${ssl_dir}/node.pem"
+mutual="true";
+fi
+echo "Mutual ssl auth: ${mutual}"
+set_conf security.ssl.rest.authentication-enabled ${mutual}
+}
+
+function set_conf_rest_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
+local ssl_dir="${TEST_DATA_DIR}/ssl/rest"
+_set_conf_ssl_helper "rest"
+set_conf_mutual_rest_ssl ${auth}
+REST_PROTOCOL="https"
+CURL_SSL_ARGS="${CURL_SSL_ARGS} --cacert ${ssl_dir}/node.pem"
+}
+
+function set_conf_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
 
 Review comment:
   is any test actually using the `server` setting? (if not then I'd say we 
should only modify 1 test to use mutual.)

-

[jira] [Commented] (FLINK-10628) Add end-to-end test for REST communication encryption

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671279#comment-16671279
 ] 

ASF GitHub Bot commented on FLINK-10628:


zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229973793
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_ssl.sh
 ##
 @@ -0,0 +1,98 @@
+#!/usr/bin/env bash
+
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+function _set_conf_ssl_helper {
+local type=$1 # 'internal' or external 'rest'
+local ssl_dir="${TEST_DATA_DIR}/ssl/${type}"
+local password="${type}.password"
+
+if [ "${type}" != "internal" ] && [ "${type}" != "rest" ]; then
+echo "Unknown type of ssl connectivity: ${type}. It can be either 
'internal' or external 'rest'"
+exit 1
+fi
+
+# clean up the dir that will be used for SSL certificates and trust stores
+if [ -e "${ssl_dir}" ]; then
+   echo "File ${ssl_dir} exists. Deleting it..."
+   rm -rf "${ssl_dir}"
+fi
+mkdir -p "${ssl_dir}"
+
+SANSTRING="dns:${NODENAME}"
+for NODEIP in $(get_node_ip) ; do
+SANSTRING="${SANSTRING},ip:${NODEIP}"
+done
+
+echo "Using SAN ${SANSTRING}"
+
+# create certificates
+keytool -genkeypair -alias ca -keystore "${ssl_dir}/ca.keystore" -dname 
"CN=Sample CA" -storepass ${password} -keypass ${password} -keyalg RSA -ext 
bc=ca:true -storetype PKCS12
+keytool -keystore "${ssl_dir}/ca.keystore" -storepass ${password} -alias 
ca -exportcert > "${ssl_dir}/ca.cer"
+keytool -importcert -keystore "${ssl_dir}/ca.truststore" -alias ca 
-storepass ${password} -noprompt -file "${ssl_dir}/ca.cer"
+
+keytool -genkeypair -alias node -keystore "${ssl_dir}/node.keystore" 
-dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass ${password} -keypass 
${password} -keyalg RSA -storetype PKCS12
+keytool -certreq -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -alias node -file "${ssl_dir}/node.csr"
+keytool -gencert -keystore "${ssl_dir}/ca.keystore" -storepass ${password} 
-alias ca -ext SAN=${SANSTRING} -infile "${ssl_dir}/node.csr" -outfile 
"${ssl_dir}/node.cer"
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/ca.cer" -alias ca -noprompt
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/node.cer" -alias node -noprompt
+
+openssl pkcs12 -passin pass:${password} -in "${ssl_dir}/node.keystore" 
-passout pass: -out "${ssl_dir}/node.pem" -nodes
+
+# adapt config
+# (here we rely on security.ssl.enabled enabling SSL for all components 
and internal as well as
+# external communication channels)
+set_conf security.ssl.${type}.enabled true
+set_conf security.ssl.${type}.keystore ${ssl_dir}/node.keystore
+set_conf security.ssl.${type}.keystore-password ${password}
+set_conf security.ssl.${type}.key-password ${password}
+set_conf security.ssl.${type}.truststore ${ssl_dir}/ca.truststore
+set_conf security.ssl.${type}.truststore-password ${password}
+}
+
+function set_conf_internal_ssl {
+_set_conf_ssl_helper "internal"
+}
+
+function set_conf_mutual_rest_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
+local mutual="false"
+local ssl_dir="${TEST_DATA_DIR}/ssl/rest"
+if [ "${auth}" == "mutual" ]; then
+CURL_SSL_ARGS="${CURL_SSL_ARGS} --cert ${ssl_dir}/node.pem"
+mutual="true";
+fi
+echo "Mutual ssl auth: ${mutual}"
+set_conf security.ssl.rest.authentication-enabled ${mutual}
+}
+
+function set_conf_rest_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
+local ssl_dir="${TEST_DATA_DIR}/ssl/rest"
+_set_conf_ssl_helper "rest"
+set_conf_mutual_rest_ssl ${auth}
+REST_PROTOCOL="https"
+CURL_SSL_ARGS="${CURL_SSL_ARGS} --cacert ${

[jira] [Commented] (FLINK-10628) Add end-to-end test for REST communication encryption

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671278#comment-16671278
 ] 

ASF GitHub Bot commented on FLINK-10628:


zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229971488
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_ssl.sh
 ##
 @@ -0,0 +1,98 @@
+#!/usr/bin/env bash
+
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+function _set_conf_ssl_helper {
+local type=$1 # 'internal' or external 'rest'
+local ssl_dir="${TEST_DATA_DIR}/ssl/${type}"
+local password="${type}.password"
+
+if [ "${type}" != "internal" ] && [ "${type}" != "rest" ]; then
+echo "Unknown type of ssl connectivity: ${type}. It can be either 
'internal' or external 'rest'"
+exit 1
+fi
+
+# clean up the dir that will be used for SSL certificates and trust stores
+if [ -e "${ssl_dir}" ]; then
+   echo "File ${ssl_dir} exists. Deleting it..."
+   rm -rf "${ssl_dir}"
+fi
+mkdir -p "${ssl_dir}"
+
+SANSTRING="dns:${NODENAME}"
+for NODEIP in $(get_node_ip) ; do
+SANSTRING="${SANSTRING},ip:${NODEIP}"
+done
+
+echo "Using SAN ${SANSTRING}"
+
+# create certificates
+keytool -genkeypair -alias ca -keystore "${ssl_dir}/ca.keystore" -dname 
"CN=Sample CA" -storepass ${password} -keypass ${password} -keyalg RSA -ext 
bc=ca:true -storetype PKCS12
+keytool -keystore "${ssl_dir}/ca.keystore" -storepass ${password} -alias 
ca -exportcert > "${ssl_dir}/ca.cer"
+keytool -importcert -keystore "${ssl_dir}/ca.truststore" -alias ca 
-storepass ${password} -noprompt -file "${ssl_dir}/ca.cer"
+
+keytool -genkeypair -alias node -keystore "${ssl_dir}/node.keystore" 
-dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass ${password} -keypass 
${password} -keyalg RSA -storetype PKCS12
+keytool -certreq -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -alias node -file "${ssl_dir}/node.csr"
+keytool -gencert -keystore "${ssl_dir}/ca.keystore" -storepass ${password} 
-alias ca -ext SAN=${SANSTRING} -infile "${ssl_dir}/node.csr" -outfile 
"${ssl_dir}/node.cer"
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/ca.cer" -alias ca -noprompt
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/node.cer" -alias node -noprompt
+
+openssl pkcs12 -passin pass:${password} -in "${ssl_dir}/node.keystore" 
-passout pass: -out "${ssl_dir}/node.pem" -nodes
+
+# adapt config
+# (here we rely on security.ssl.enabled enabling SSL for all components 
and internal as well as
+# external communication channels)
+set_conf security.ssl.${type}.enabled true
+set_conf security.ssl.${type}.keystore ${ssl_dir}/node.keystore
+set_conf security.ssl.${type}.keystore-password ${password}
+set_conf security.ssl.${type}.key-password ${password}
+set_conf security.ssl.${type}.truststore ${ssl_dir}/ca.truststore
+set_conf security.ssl.${type}.truststore-password ${password}
+}
+
+function set_conf_internal_ssl {
 
 Review comment:
   function seems unnecessary


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add end-to-end test for REST communication encryption
> -
>
> Key: FLINK-10628
> URL: https://issues.apache.org/jira/browse/FLINK-10628
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann

[jira] [Commented] (FLINK-10628) Add end-to-end test for REST communication encryption

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671276#comment-16671276
 ] 

ASF GitHub Bot commented on FLINK-10628:


zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229971085
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_ssl.sh
 ##
 @@ -0,0 +1,98 @@
+#!/usr/bin/env bash
+
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+function _set_conf_ssl_helper {
+local type=$1 # 'internal' or external 'rest'
+local ssl_dir="${TEST_DATA_DIR}/ssl/${type}"
+local password="${type}.password"
+
+if [ "${type}" != "internal" ] && [ "${type}" != "rest" ]; then
+echo "Unknown type of ssl connectivity: ${type}. It can be either 
'internal' or external 'rest'"
+exit 1
+fi
+
+# clean up the dir that will be used for SSL certificates and trust stores
+if [ -e "${ssl_dir}" ]; then
+   echo "File ${ssl_dir} exists. Deleting it..."
+   rm -rf "${ssl_dir}"
+fi
+mkdir -p "${ssl_dir}"
+
+SANSTRING="dns:${NODENAME}"
+for NODEIP in $(get_node_ip) ; do
+SANSTRING="${SANSTRING},ip:${NODEIP}"
+done
+
+echo "Using SAN ${SANSTRING}"
+
+# create certificates
+keytool -genkeypair -alias ca -keystore "${ssl_dir}/ca.keystore" -dname 
"CN=Sample CA" -storepass ${password} -keypass ${password} -keyalg RSA -ext 
bc=ca:true -storetype PKCS12
+keytool -keystore "${ssl_dir}/ca.keystore" -storepass ${password} -alias 
ca -exportcert > "${ssl_dir}/ca.cer"
+keytool -importcert -keystore "${ssl_dir}/ca.truststore" -alias ca 
-storepass ${password} -noprompt -file "${ssl_dir}/ca.cer"
+
+keytool -genkeypair -alias node -keystore "${ssl_dir}/node.keystore" 
-dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass ${password} -keypass 
${password} -keyalg RSA -storetype PKCS12
+keytool -certreq -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -alias node -file "${ssl_dir}/node.csr"
+keytool -gencert -keystore "${ssl_dir}/ca.keystore" -storepass ${password} 
-alias ca -ext SAN=${SANSTRING} -infile "${ssl_dir}/node.csr" -outfile 
"${ssl_dir}/node.cer"
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/ca.cer" -alias ca -noprompt
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/node.cer" -alias node -noprompt
+
+openssl pkcs12 -passin pass:${password} -in "${ssl_dir}/node.keystore" 
-passout pass: -out "${ssl_dir}/node.pem" -nodes
 
 Review comment:
   where does this do and where does it come from? It doesn't documented in the 
[SSL 
docs](https://ci.apache.org/projects/flink/flink-docs-master/ops/security-ssl.html).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add end-to-end test for REST communication encryption
> -
>
> Key: FLINK-10628
> URL: https://issues.apache.org/jira/browse/FLINK-10628
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-10371 we added support for mutual authentication for the REST 
> communication. We should adapt one of the existing end-to-end tests to 
> require this feature for the REST communication.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10628) Add end-to-end test for REST communication encryption

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671277#comment-16671277
 ] 

ASF GitHub Bot commented on FLINK-10628:


zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229972405
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_ssl.sh
 ##
 @@ -0,0 +1,98 @@
+#!/usr/bin/env bash
+
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+function _set_conf_ssl_helper {
+local type=$1 # 'internal' or external 'rest'
+local ssl_dir="${TEST_DATA_DIR}/ssl/${type}"
+local password="${type}.password"
+
+if [ "${type}" != "internal" ] && [ "${type}" != "rest" ]; then
+echo "Unknown type of ssl connectivity: ${type}. It can be either 
'internal' or external 'rest'"
+exit 1
+fi
+
+# clean up the dir that will be used for SSL certificates and trust stores
+if [ -e "${ssl_dir}" ]; then
+   echo "File ${ssl_dir} exists. Deleting it..."
+   rm -rf "${ssl_dir}"
+fi
+mkdir -p "${ssl_dir}"
+
+SANSTRING="dns:${NODENAME}"
+for NODEIP in $(get_node_ip) ; do
+SANSTRING="${SANSTRING},ip:${NODEIP}"
+done
+
+echo "Using SAN ${SANSTRING}"
+
+# create certificates
+keytool -genkeypair -alias ca -keystore "${ssl_dir}/ca.keystore" -dname 
"CN=Sample CA" -storepass ${password} -keypass ${password} -keyalg RSA -ext 
bc=ca:true -storetype PKCS12
+keytool -keystore "${ssl_dir}/ca.keystore" -storepass ${password} -alias 
ca -exportcert > "${ssl_dir}/ca.cer"
+keytool -importcert -keystore "${ssl_dir}/ca.truststore" -alias ca 
-storepass ${password} -noprompt -file "${ssl_dir}/ca.cer"
+
+keytool -genkeypair -alias node -keystore "${ssl_dir}/node.keystore" 
-dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass ${password} -keypass 
${password} -keyalg RSA -storetype PKCS12
+keytool -certreq -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -alias node -file "${ssl_dir}/node.csr"
+keytool -gencert -keystore "${ssl_dir}/ca.keystore" -storepass ${password} 
-alias ca -ext SAN=${SANSTRING} -infile "${ssl_dir}/node.csr" -outfile 
"${ssl_dir}/node.cer"
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/ca.cer" -alias ca -noprompt
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/node.cer" -alias node -noprompt
+
+openssl pkcs12 -passin pass:${password} -in "${ssl_dir}/node.keystore" 
-passout pass: -out "${ssl_dir}/node.pem" -nodes
+
+# adapt config
+# (here we rely on security.ssl.enabled enabling SSL for all components 
and internal as well as
+# external communication channels)
+set_conf security.ssl.${type}.enabled true
+set_conf security.ssl.${type}.keystore ${ssl_dir}/node.keystore
+set_conf security.ssl.${type}.keystore-password ${password}
+set_conf security.ssl.${type}.key-password ${password}
+set_conf security.ssl.${type}.truststore ${ssl_dir}/ca.truststore
+set_conf security.ssl.${type}.truststore-password ${password}
+}
+
+function set_conf_internal_ssl {
+_set_conf_ssl_helper "internal"
+}
+
+function set_conf_mutual_rest_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
+local mutual="false"
+local ssl_dir="${TEST_DATA_DIR}/ssl/rest"
+if [ "${auth}" == "mutual" ]; then
+CURL_SSL_ARGS="${CURL_SSL_ARGS} --cert ${ssl_dir}/node.pem"
+mutual="true";
+fi
+echo "Mutual ssl auth: ${mutual}"
+set_conf security.ssl.rest.authentication-enabled ${mutual}
+}
+
+function set_conf_rest_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
+local ssl_dir="${TEST_DATA_DIR}/ssl/rest"
+_set_conf_ssl_helper "rest"
+set_conf_mutual_rest_ssl ${auth}
+REST_PROTOCOL="https"
+CURL_SSL_ARGS="${CURL_SSL_ARGS} --cacert ${

[GitHub] yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges

2018-11-01 Thread GitBox
yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges
URL: https://github.com/apache/flink/pull/6850#issuecomment-434975263
 
 
   @zentol  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671287#comment-16671287
 ] 

ASF GitHub Bot commented on FLINK-10252:


yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges
URL: https://github.com/apache/flink/pull/6850#issuecomment-434975263
 
 
   @zentol  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10745) Improve record writer and serializer stack

2018-11-01 Thread zhijiang (JIRA)
zhijiang created FLINK-10745:


 Summary: Improve record writer and serializer stack
 Key: FLINK-10745
 URL: https://issues.apache.org/jira/browse/FLINK-10745
 Project: Flink
  Issue Type: Improvement
  Components: Network
Affects Versions: 1.8.0
Reporter: zhijiang
Assignee: zhijiang


This is the umbrella issue for tracking the improvements of record writer and 
serialization stack.

 

There are mainly involved in three components:

1. {{ChannelSelector}} : Refactor the interface method and unify the 
implementations of stream and batch modes.

2. {{RecordSerializer}} : Redesign the process for serialization and copy only 
once for multiple selected channels.

2. {{RecordWriter}} : Redesign the process for the serialization improvement 
above.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10321) Make the condition of broadcast partitioner simple

2018-11-01 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-10321:
-
Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-10745

> Make the condition of broadcast partitioner simple
> --
>
> Key: FLINK-10321
> URL: https://issues.apache.org/jira/browse/FLINK-10321
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.7.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>
> The current  {{BroadcastPartitioner}} uses the vars of {{set}} and 
> {{setNumber}} as the condition for returning channel arrays.
> Instead of using {{set}} and {{setNumber}}, we can just check whether 
> {{returnChannel.length == numberOfOutputChannels}} as the condition to make 
> it simple.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10662) Refactor the ChannelSelector interface for single selected channel

2018-11-01 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-10662:
-
Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-10745

> Refactor the ChannelSelector interface for single selected channel
> --
>
> Key: FLINK-10662
> URL: https://issues.apache.org/jira/browse/FLINK-10662
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.4, 1.6.1
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In the discussion of broadcast improvement, [~pnowojski] pointed out the 
> issue of improving the current channel selector.
>  
> In {{ChannelSelector#selectChannels}}, it would return an array for selected 
> channels. But considering specific implementations, only 
> {{BroadcastPartitioner}} would select all the channels, and other 
> implementations will select one channel. So we can simple this interface to 
> return single channel index for benefiting performance, and specialize the 
> {{BroadcastPartitioner}} in a more efficient way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-11-01 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-9913:

Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-10745

> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10689) Port Table API extension points to flink-table-common

2018-11-01 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671291#comment-16671291
 ] 

Timo Walther commented on FLINK-10689:
--

Yes, it would be very helpful if we could parallelize the subtasks. However, 
many issues about catalogs, connectors, and the Hive integration mutually 
depend on each other, so we need be careful which tasks to do first in order to 
keep duplicate work and merge conflicts low.

We could already port {{org.apache.flink.table.functions}} without waiting for 
FLINK-10688. However, porting {{org.apache.flink.table.catalog}} is a bit more 
tricky as it depends on sources and sinks interfaces (thus FLINK-10688). The 
external catalog builder depends on 
{{org.apache.flink.table.descriptors.Schema}} which depends on 
{{org.apache.flink.table.descriptors.Rowtime}} which depends on 
{{org.apache.flink.table.sources.tsextractors.TimestampExtractor}} which pulls 
in {{Expression}} and thus the entire Table API which does not belong in a 
{{common}} module. Once we are confident how the improved [unified connector 
interfaces|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf]
 look like we might need to rework table sources/sink interfaces as well as 
utility classes such as {{TimestampExtractor}}s. IMHO this has highest priority 
once the 1.7 release is out.

Feel free to starting porting everything that clearly belongs into a {{common}} 
module and does not pull in unrelated classes, if you see a possibility for it.

> Port Table API extension points to flink-table-common
> -
>
> Key: FLINK-10689
> URL: https://issues.apache.org/jira/browse/FLINK-10689
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: xueyu
>Priority: Major
>
> After FLINK-10687 and FLINK-10688 have been resolved, we should also port the 
> remaining extension points of the Table API to flink-table-common. This 
> includes interfaces for UDFs and the external catalog interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10537) Network throughput performance regression after broadcast changes

2018-11-01 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-10537:
-
Issue Type: Sub-task  (was: Bug)
Parent: FLINK-10745

> Network throughput performance regression after broadcast changes
> -
>
> Key: FLINK-10537
> URL: https://issues.apache.org/jira/browse/FLINK-10537
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> There is a slight network throughput regression introduced in: 
> https://issues.apache.org/jira/browse/FLINK-9913
> It is visible in the following benchmark:
> [http://codespeed.dak8s.net:8000/timeline/#/?exe=1&ben=networkThroughput.1,100ms&env=2&revs=200&equid=off&quarts=on&extr=on]
> (drop in the chart that happened since 21st September.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10672) Task stuck while writing output to flink

2018-11-01 Thread Maximilian Michels (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671319#comment-16671319
 ] 

Maximilian Michels commented on FLINK-10672:


Thanks for investigating the matter [~angoenka].

This seems like an issue with the deadlock detection code. It's not a blocker 
for us anymore because we have the {{BATCH_FORCED}} workaround. We will 
continue to investigate the pipelined execution issue.

The TFX pipeline makes heavy use of broadcast variables and there is a 
suspicion these might not be scheduled correctly under certain conditions.

> Task stuck while writing output to flink
> 
>
> Key: FLINK-10672
> URL: https://issues.apache.org/jira/browse/FLINK-10672
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.4
> Environment: OS: Debuan rodente 4.17
> Flink version: 1.5.4
> ||Key||Value||
> |jobmanager.heap.mb|1024|
> |jobmanager.rpc.address|localhost|
> |jobmanager.rpc.port|6123|
> |metrics.reporter.jmx.class|org.apache.flink.metrics.jmx.JMXReporter|
> |metrics.reporter.jmx.port|9250-9260|
> |metrics.reporters|jmx|
> |parallelism.default|1|
> |rest.port|8081|
> |taskmanager.heap.mb|1024|
> |taskmanager.numberOfTaskSlots|1|
> |web.tmpdir|/tmp/flink-web-bdb73d6c-5b9e-47b5-9ebf-eed0a7c82c26|
>  
> h1. Overview
> ||Data Port||All Slots||Free Slots||CPU Cores||Physical Memory||JVM Heap 
> Size||Flink Managed Memory||
> |43501|1|0|12|62.9 GB|922 MB|642 MB|
> h1. Memory
> h2. JVM (Heap/Non-Heap)
> ||Type||Committed||Used||Maximum||
> |Heap|922 MB|575 MB|922 MB|
> |Non-Heap|68.8 MB|64.3 MB|-1 B|
> |Total|991 MB|639 MB|922 MB|
> h2. Outside JVM
> ||Type||Count||Used||Capacity||
> |Direct|3,292|105 MB|105 MB|
> |Mapped|0|0 B|0 B|
> h1. Network
> h2. Memory Segments
> ||Type||Count||
> |Available|3,194|
> |Total|3,278|
> h1. Garbage Collection
> ||Collector||Count||Time||
> |G1_Young_Generation|13|336|
> |G1_Old_Generation|1|21|
>Reporter: Ankur Goenka
>Priority: Major
> Attachments: 1uruvakHxBu.png, 3aDKQ24WvKk.png, Po89UGDn58V.png, 
> jmx_dump.json, jmx_dump_detailed.json, jstack_129827.log, jstack_163822.log, 
> jstack_66985.log
>
>
> I am running a fairly complex pipleline with 200+ task.
> The pipeline works fine with small data (order of 10kb input) but gets stuck 
> with a slightly larger data (300kb input).
>  
> The task gets stuck while writing the output toFlink, more specifically it 
> gets stuck while requesting memory segment in local buffer pool. The Task 
> manager UI shows that it has enough memory and memory segments to work with.
> The relevant stack trace is 
> {quote}"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 
> tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9]
>  java.lang.Thread.State: TIMED_WAITING (on object monitor)
>  at (C/C++) 0x7fef201c7dae (Unknown Source)
>  at (C/C++) 0x7fef1f2aea07 (Unknown Source)
>  at (C/C++) 0x7fef1f241cd3 (Unknown Source)
>  at java.lang.Object.wait(Native Method)
>  - waiting on <0xf6d56450> (a java.util.ArrayDeque)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)
>  - locked <0xf6d56450> (a java.util.ArrayDeque)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>  at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)
>  at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)
>  - locked <0xf6a60bd0> (a java.lang.Object)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.j

[jira] [Closed] (FLINK-10097) More tests to increase StreamingFileSink test coverage

2018-11-01 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-10097.
--
   Resolution: Fixed
Fix Version/s: (was: 1.6.3)
   1.7.0

Merged on master with 9ad932c52056d0ed7e96e1c385403a529e6a8edb

> More tests to increase StreamingFileSink test coverage
> --
>
> Key: FLINK-10097
> URL: https://issues.apache.org/jira/browse/FLINK-10097
> Project: Flink
>  Issue Type: Sub-task
>  Components: filesystem-connector
>Affects Versions: 1.6.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10672) Task stuck while writing output to flink

2018-11-01 Thread Maximilian Michels (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-10672:
---
Fix Version/s: 1.5.6

> Task stuck while writing output to flink
> 
>
> Key: FLINK-10672
> URL: https://issues.apache.org/jira/browse/FLINK-10672
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.4
> Environment: OS: Debuan rodente 4.17
> Flink version: 1.5.4
> ||Key||Value||
> |jobmanager.heap.mb|1024|
> |jobmanager.rpc.address|localhost|
> |jobmanager.rpc.port|6123|
> |metrics.reporter.jmx.class|org.apache.flink.metrics.jmx.JMXReporter|
> |metrics.reporter.jmx.port|9250-9260|
> |metrics.reporters|jmx|
> |parallelism.default|1|
> |rest.port|8081|
> |taskmanager.heap.mb|1024|
> |taskmanager.numberOfTaskSlots|1|
> |web.tmpdir|/tmp/flink-web-bdb73d6c-5b9e-47b5-9ebf-eed0a7c82c26|
>  
> h1. Overview
> ||Data Port||All Slots||Free Slots||CPU Cores||Physical Memory||JVM Heap 
> Size||Flink Managed Memory||
> |43501|1|0|12|62.9 GB|922 MB|642 MB|
> h1. Memory
> h2. JVM (Heap/Non-Heap)
> ||Type||Committed||Used||Maximum||
> |Heap|922 MB|575 MB|922 MB|
> |Non-Heap|68.8 MB|64.3 MB|-1 B|
> |Total|991 MB|639 MB|922 MB|
> h2. Outside JVM
> ||Type||Count||Used||Capacity||
> |Direct|3,292|105 MB|105 MB|
> |Mapped|0|0 B|0 B|
> h1. Network
> h2. Memory Segments
> ||Type||Count||
> |Available|3,194|
> |Total|3,278|
> h1. Garbage Collection
> ||Collector||Count||Time||
> |G1_Young_Generation|13|336|
> |G1_Old_Generation|1|21|
>Reporter: Ankur Goenka
>Priority: Major
>  Labels: beam
> Fix For: 1.5.6
>
> Attachments: 1uruvakHxBu.png, 3aDKQ24WvKk.png, Po89UGDn58V.png, 
> jmx_dump.json, jmx_dump_detailed.json, jstack_129827.log, jstack_163822.log, 
> jstack_66985.log
>
>
> I am running a fairly complex pipleline with 200+ task.
> The pipeline works fine with small data (order of 10kb input) but gets stuck 
> with a slightly larger data (300kb input).
>  
> The task gets stuck while writing the output toFlink, more specifically it 
> gets stuck while requesting memory segment in local buffer pool. The Task 
> manager UI shows that it has enough memory and memory segments to work with.
> The relevant stack trace is 
> {quote}"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 
> tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9]
>  java.lang.Thread.State: TIMED_WAITING (on object monitor)
>  at (C/C++) 0x7fef201c7dae (Unknown Source)
>  at (C/C++) 0x7fef1f2aea07 (Unknown Source)
>  at (C/C++) 0x7fef1f241cd3 (Unknown Source)
>  at java.lang.Object.wait(Native Method)
>  - waiting on <0xf6d56450> (a java.util.ArrayDeque)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)
>  - locked <0xf6d56450> (a java.util.ArrayDeque)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>  at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)
>  at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)
>  - locked <0xf6a60bd0> (a java.lang.Object)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
>  at 
> org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessa

[jira] [Updated] (FLINK-10672) Task stuck while writing output to flink

2018-11-01 Thread Maximilian Michels (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-10672:
---
Labels: beam  (was: )

> Task stuck while writing output to flink
> 
>
> Key: FLINK-10672
> URL: https://issues.apache.org/jira/browse/FLINK-10672
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.4
> Environment: OS: Debuan rodente 4.17
> Flink version: 1.5.4
> ||Key||Value||
> |jobmanager.heap.mb|1024|
> |jobmanager.rpc.address|localhost|
> |jobmanager.rpc.port|6123|
> |metrics.reporter.jmx.class|org.apache.flink.metrics.jmx.JMXReporter|
> |metrics.reporter.jmx.port|9250-9260|
> |metrics.reporters|jmx|
> |parallelism.default|1|
> |rest.port|8081|
> |taskmanager.heap.mb|1024|
> |taskmanager.numberOfTaskSlots|1|
> |web.tmpdir|/tmp/flink-web-bdb73d6c-5b9e-47b5-9ebf-eed0a7c82c26|
>  
> h1. Overview
> ||Data Port||All Slots||Free Slots||CPU Cores||Physical Memory||JVM Heap 
> Size||Flink Managed Memory||
> |43501|1|0|12|62.9 GB|922 MB|642 MB|
> h1. Memory
> h2. JVM (Heap/Non-Heap)
> ||Type||Committed||Used||Maximum||
> |Heap|922 MB|575 MB|922 MB|
> |Non-Heap|68.8 MB|64.3 MB|-1 B|
> |Total|991 MB|639 MB|922 MB|
> h2. Outside JVM
> ||Type||Count||Used||Capacity||
> |Direct|3,292|105 MB|105 MB|
> |Mapped|0|0 B|0 B|
> h1. Network
> h2. Memory Segments
> ||Type||Count||
> |Available|3,194|
> |Total|3,278|
> h1. Garbage Collection
> ||Collector||Count||Time||
> |G1_Young_Generation|13|336|
> |G1_Old_Generation|1|21|
>Reporter: Ankur Goenka
>Priority: Major
>  Labels: beam
> Fix For: 1.5.6
>
> Attachments: 1uruvakHxBu.png, 3aDKQ24WvKk.png, Po89UGDn58V.png, 
> jmx_dump.json, jmx_dump_detailed.json, jstack_129827.log, jstack_163822.log, 
> jstack_66985.log
>
>
> I am running a fairly complex pipleline with 200+ task.
> The pipeline works fine with small data (order of 10kb input) but gets stuck 
> with a slightly larger data (300kb input).
>  
> The task gets stuck while writing the output toFlink, more specifically it 
> gets stuck while requesting memory segment in local buffer pool. The Task 
> manager UI shows that it has enough memory and memory segments to work with.
> The relevant stack trace is 
> {quote}"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 
> tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9]
>  java.lang.Thread.State: TIMED_WAITING (on object monitor)
>  at (C/C++) 0x7fef201c7dae (Unknown Source)
>  at (C/C++) 0x7fef1f2aea07 (Unknown Source)
>  at (C/C++) 0x7fef1f241cd3 (Unknown Source)
>  at java.lang.Object.wait(Native Method)
>  - waiting on <0xf6d56450> (a java.util.ArrayDeque)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)
>  - locked <0xf6d56450> (a java.util.ArrayDeque)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>  at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)
>  at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)
>  - locked <0xf6a60bd0> (a java.lang.Object)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
>  at 
> org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMess

[jira] [Commented] (FLINK-10704) Fix sql client end to end test failure

2018-11-01 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671329#comment-16671329
 ] 

vinoyang commented on FLINK-10704:
--

[~pnowojski] [~twalthr] A strange exception : 
{code:java}
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
The configured environment is invalid. Please check your environment files 
again.
 at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:140)
 at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could 
not create execution context.
 at 
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:485)
 at 
org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:313)
 at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:137)
 ... 2 more
Caused by: java.lang.NoClassDefFoundError: 
org/apache/flink/table/factories/TableFormatFactoryBase
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
 at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
 at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
 at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:348)
 at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
 at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
 at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
 at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
 at scala.collection.Iterator$class.toStream(Iterator.scala:1320)
 at scala.collection.AbstractIterator.toStream(Iterator.scala:1334)
 at scala.collection.Iterator$$anonfun$toStream$1.apply(Iterator.scala:1320)
 at scala.collection.Iterator$$anonfun$toStream$1.apply(Iterator.scala:1320)
 at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
 at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
 at scala.collection.immutable.Stream.filter(Stream.scala:519)
 at 
scala.collection.immutable.Stream$$anonfun$filteredTail$1.apply(Stream.scala:1299)
 at 
scala.collection.immutable.Stream$$anonfun$filteredTail$1.apply(Stream.scala:1299)
 at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
 at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
 at scala.collection.immutable.Stream.filter(Stream.scala:519)
 at scala.collection.immutable.Stream.filter(Stream.scala:202)
 at 
org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:197)
 at 
org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
 at 
org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100)
 at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:236)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:121)
 at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:119)
 at 
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:481)
 ... 4 more
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.table.factories.TableFormatFactoryBase
 at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 ... 43 more
Waiting for CSV results...
{code}
I did not find the usage of TableFormatFactoryBase anywhere. What's the reason 
for this exception? I have updated specific client and table jar and 
sql-client-defaults.yaml in *build-target* dir. But it still reports this 
exception.

> Fix sql client end to end test failure
> --
>
> Key: FLINK-10704
> URL: https://issues.apache.org/jira/browse/FLINK-10704
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests, Kafka Connector
>Reporter: vinoyang
>Assignee: vi

[jira] [Commented] (FLINK-10704) Fix sql client end to end test failure

2018-11-01 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671336#comment-16671336
 ] 

Timo Walther commented on FLINK-10704:
--

Have you rebased your branch to the current master? I merged a fix yesterday 
because {{flink-table-common}} was not part of the {{flink-table}} jar.

> Fix sql client end to end test failure
> --
>
> Key: FLINK-10704
> URL: https://issues.apache.org/jira/browse/FLINK-10704
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The log file contains the following sentence:
> {code:java}
> 2018-10-29 03:27:39,209 WARN 
> org.apache.flink.kafka010.shaded.org.apache.kafka.common.utils.AppInfoParser 
> - Error while loading kafka-version.properties :null
> {code}
> The reason for this log is that we explicitly exclude the version description 
> file of the kafka client when packaging the connector:
> {code:java}
> 
>
>   *:*
>   
>  kafka/kafka-version.properties
>   
>
> {code}
> When the shell scan the "error" keyword with grep, it will hit, so the test 
> will fail.
> {code:java}
> function check_logs_for_errors {
>   error_count=$(grep -rv "GroupCoordinatorNotAvailableException" 
> $FLINK_DIR/log \
>   | grep -v "RetriableCommitFailedException" \
>   | grep -v "NoAvailableBrokersException" \
>   | grep -v "Async Kafka commit failed" \
>   | grep -v "DisconnectException" \
>   | grep -v "AskTimeoutException" \
>   | grep -v "WARN  akka.remote.transport.netty.NettyTransport" \
>   | grep -v  "WARN  
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \
>   | grep -v "jvm-exit-on-fatal-error" \
>   | grep -v '^INFO:.*AWSErrorCode=\[400 Bad 
> Request\].*ServiceEndpoint=\[https://.*\.s3\.amazonaws\.com\].*RequestType=\[HeadBucketRequest\]'
>  \
>   | grep -v "RejectedExecutionException" \
>   | grep -v "An exception was thrown by an exception handler" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/yarn/exceptions/YarnException" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/conf/Configuration" \
>   | grep -v 
> "org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector
>   - Error when creating PropertyDescriptor for public final void 
> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)!
>  Ignoring this property." \
>   | grep -ic "error")//here
>   if [[ ${error_count} -gt 0 ]]; then
> echo "Found error in log files:"
> cat $FLINK_DIR/log/*
> EXIT_CODE=1
>   fi
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

2018-11-01 Thread GitBox
pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r229975091
 
 

 ##
 File path: flink-end-to-end-tests/flink-kafka-test/pom.xml
 ##
 @@ -0,0 +1,91 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.7-SNAPSHOT
+   
+   4.0.0
+
+   flink-kafka-test
+   flink-kafka-test
+
+   
+   
+   org.apache.flink
+   flink-kafka-test-base
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-base_${scala.binary.version}
 
 Review comment:
   I don't think that you need this dependency here at all. Definitely it 
shouldn't be here, since we do not expect our users to include it


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671344#comment-16671344
 ] 

ASF GitHub Bot commented on FLINK-10600:


pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r229983121
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-kafka-test-base/src/main/java/org/apache/flink/kafka/tests/KafkaEvent.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kafka.tests;
+
+/**
+ * The event type used in the {@link Kafka010Example}.
+ *
+ * This is a Java POJO, which Flink recognizes and will allow "by-name" 
field referencing
+ * when keying a {@link org.apache.flink.streaming.api.datastream.DataStream} 
of such a type.
+ * For a demonstration of this, see the code in {@link Kafka010Example}.
+ */
+public class KafkaEvent {
 
 Review comment:
   I have talked with @tzulitai and we concluded that to avoid this duplication 
it would be best to:
   
   - Move submodule (and rename) `flink-kafka-test-base` to 
`flink-streaming-examples/flink-kafka-example-base`
   - Kafka version specifics (`flink-streaming-examples/flink-kafka-example` 
and `flink-streaming-examples/flink-kafka-0.10-example`) extend that base
   - End to end tests just use the example jobs for testing


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

2018-11-01 Thread GitBox
pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r229983121
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-kafka-test-base/src/main/java/org/apache/flink/kafka/tests/KafkaEvent.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kafka.tests;
+
+/**
+ * The event type used in the {@link Kafka010Example}.
+ *
+ * This is a Java POJO, which Flink recognizes and will allow "by-name" 
field referencing
+ * when keying a {@link org.apache.flink.streaming.api.datastream.DataStream} 
of such a type.
+ * For a demonstration of this, see the code in {@link Kafka010Example}.
+ */
+public class KafkaEvent {
 
 Review comment:
   I have talked with @tzulitai and we concluded that to avoid this duplication 
it would be best to:
   
   - Move submodule (and rename) `flink-kafka-test-base` to 
`flink-streaming-examples/flink-kafka-example-base`
   - Kafka version specifics (`flink-streaming-examples/flink-kafka-example` 
and `flink-streaming-examples/flink-kafka-0.10-example`) extend that base
   - End to end tests just use the example jobs for testing


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671343#comment-16671343
 ] 

ASF GitHub Bot commented on FLINK-10600:


pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r229975091
 
 

 ##
 File path: flink-end-to-end-tests/flink-kafka-test/pom.xml
 ##
 @@ -0,0 +1,91 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.7-SNAPSHOT
+   
+   4.0.0
+
+   flink-kafka-test
+   flink-kafka-test
+
+   
+   
+   org.apache.flink
+   flink-kafka-test-base
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-base_${scala.binary.version}
 
 Review comment:
   I don't think that you need this dependency here at all. Definitely it 
shouldn't be here, since we do not expect our users to include it


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10704) Fix sql client end to end test failure

2018-11-01 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671342#comment-16671342
 ] 

vinoyang commented on FLINK-10704:
--

[~twalthr] Oh, I did not. I will try it.

> Fix sql client end to end test failure
> --
>
> Key: FLINK-10704
> URL: https://issues.apache.org/jira/browse/FLINK-10704
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The log file contains the following sentence:
> {code:java}
> 2018-10-29 03:27:39,209 WARN 
> org.apache.flink.kafka010.shaded.org.apache.kafka.common.utils.AppInfoParser 
> - Error while loading kafka-version.properties :null
> {code}
> The reason for this log is that we explicitly exclude the version description 
> file of the kafka client when packaging the connector:
> {code:java}
> 
>
>   *:*
>   
>  kafka/kafka-version.properties
>   
>
> {code}
> When the shell scan the "error" keyword with grep, it will hit, so the test 
> will fail.
> {code:java}
> function check_logs_for_errors {
>   error_count=$(grep -rv "GroupCoordinatorNotAvailableException" 
> $FLINK_DIR/log \
>   | grep -v "RetriableCommitFailedException" \
>   | grep -v "NoAvailableBrokersException" \
>   | grep -v "Async Kafka commit failed" \
>   | grep -v "DisconnectException" \
>   | grep -v "AskTimeoutException" \
>   | grep -v "WARN  akka.remote.transport.netty.NettyTransport" \
>   | grep -v  "WARN  
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \
>   | grep -v "jvm-exit-on-fatal-error" \
>   | grep -v '^INFO:.*AWSErrorCode=\[400 Bad 
> Request\].*ServiceEndpoint=\[https://.*\.s3\.amazonaws\.com\].*RequestType=\[HeadBucketRequest\]'
>  \
>   | grep -v "RejectedExecutionException" \
>   | grep -v "An exception was thrown by an exception handler" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/yarn/exceptions/YarnException" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/conf/Configuration" \
>   | grep -v 
> "org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector
>   - Error when creating PropertyDescriptor for public final void 
> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)!
>  Ignoring this property." \
>   | grep -ic "error")//here
>   if [[ ${error_count} -gt 0 ]]; then
> echo "Found error in log files:"
> cat $FLINK_DIR/log/*
> EXIT_CODE=1
>   fi
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty

2018-11-01 Thread GitBox
pnowojski commented on issue #6698: [FLINK-8581][network] Move flushing remote 
subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#issuecomment-434985425
 
 
   No, I have to rethink this whole thing and it may need some much bigger 
refactor to avoid adding more locks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski edited a comment on issue #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty

2018-11-01 Thread GitBox
pnowojski edited a comment on issue #6698: [FLINK-8581][network] Move flushing 
remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#issuecomment-434985425
 
 
   @zhijiangW unfortunately no, I have to rethink this whole thing and it may 
need some much bigger refactor to avoid adding more locks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671351#comment-16671351
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

pnowojski commented on issue #6698: [FLINK-8581][network] Move flushing remote 
subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#issuecomment-434985425
 
 
   No, I have to rethink this whole thing and it may need some much bigger 
refactor to avoid adding more locks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671355#comment-16671355
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

pnowojski edited a comment on issue #6698: [FLINK-8581][network] Move flushing 
remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#issuecomment-434985425
 
 
   @zhijiangW unfortunately no, I have to rethink this whole thing and it may 
need some much bigger refactor to avoid adding more locks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

2018-11-01 Thread GitBox
yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r229985144
 
 

 ##
 File path: flink-end-to-end-tests/flink-kafka-test/pom.xml
 ##
 @@ -0,0 +1,91 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.7-SNAPSHOT
+   
+   4.0.0
+
+   flink-kafka-test
+   flink-kafka-test
+
+   
+   
+   org.apache.flink
+   flink-kafka-test-base
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-base_${scala.binary.version}
 
 Review comment:
   I didn't introduce it at first, this was done to explicitly exclude the base 
module's dependency on the lower version of the kafka client. Otherwise it will 
cause a compilation error.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671356#comment-16671356
 ] 

ASF GitHub Bot commented on FLINK-10600:


yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r229985144
 
 

 ##
 File path: flink-end-to-end-tests/flink-kafka-test/pom.xml
 ##
 @@ -0,0 +1,91 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.7-SNAPSHOT
+   
+   4.0.0
+
+   flink-kafka-test
+   flink-kafka-test
+
+   
+   
+   org.apache.flink
+   flink-kafka-test-base
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-base_${scala.binary.version}
 
 Review comment:
   I didn't introduce it at first, this was done to explicitly exclude the base 
module's dependency on the lower version of the kafka client. Otherwise it will 
cause a compilation error.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

2018-11-01 Thread GitBox
yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r229985611
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-kafka-test-base/src/main/java/org/apache/flink/kafka/tests/KafkaEvent.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kafka.tests;
+
+/**
+ * The event type used in the {@link Kafka010Example}.
+ *
+ * This is a Java POJO, which Flink recognizes and will allow "by-name" 
field referencing
+ * when keying a {@link org.apache.flink.streaming.api.datastream.DataStream} 
of such a type.
+ * For a demonstration of this, see the code in {@link Kafka010Example}.
+ */
+public class KafkaEvent {
 
 Review comment:
   OK, it sounds a good solution.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671359#comment-16671359
 ] 

ASF GitHub Bot commented on FLINK-10600:


yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r229985611
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-kafka-test-base/src/main/java/org/apache/flink/kafka/tests/KafkaEvent.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kafka.tests;
+
+/**
+ * The event type used in the {@link Kafka010Example}.
+ *
+ * This is a Java POJO, which Flink recognizes and will allow "by-name" 
field referencing
+ * when keying a {@link org.apache.flink.streaming.api.datastream.DataStream} 
of such a type.
+ * For a demonstration of this, see the code in {@link Kafka010Example}.
+ */
+public class KafkaEvent {
 
 Review comment:
   OK, it sounds a good solution.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dawidwys closed pull request #6959: [FLINK-10678][e2e] Introduced switch to disable log checking in e2e tests

2018-11-01 Thread GitBox
dawidwys closed pull request #6959: [FLINK-10678][e2e] Introduced switch to 
disable log checking in e2e tests
URL: https://github.com/apache/flink/pull/6959
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 46931495d27..bbb0e9e5885 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -41,19 +41,25 @@ echo "Flink distribution directory: $FLINK_DIR"
 
 # Template for adding a test:
 
-# run_test "" "$END_TO_END_DIR/test-scripts/"
+# run_test "" "$END_TO_END_DIR/test-scripts/" 
["skip_check_exceptions"]
+
+# IMPORTANT:
+# With the "skip_check_exceptions" flag one can disable default exceptions and 
errors checking in log files. This should be done
+# carefully though. A valid reasons for doing so could be e.g killing TMs 
randomly as we cannot predict what exception could be thrown. Whenever
+# those checks are disabled, one should take care that a proper checks are 
performed in the tests itself that ensure that the test finished
+# in an expected state.
 
 run_test "ConnectedComponents iterations with high parallelism end-to-end 
test" "$END_TO_END_DIR/test-scripts/test_high_parallelism_iterations.sh 25"
 
 run_test "Queryable state (rocksdb) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_queryable_state.sh rocksdb"
-run_test "Queryable state (rocksdb) with TM restart end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh"
+run_test "Queryable state (rocksdb) with TM restart end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" 
"skip_check_exceptions"
 
-run_test "Running HA dataset end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_dataset.sh"
+run_test "Running HA dataset end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_dataset.sh" "skip_check_exceptions"
 
-run_test "Running HA (file, async) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_datastream.sh file true false"
-run_test "Running HA (file, sync) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_datastream.sh file false false"
-run_test "Running HA (rocks, non-incremental) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true false"
-run_test "Running HA (rocks, incremental) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true true"
+run_test "Running HA (file, async) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_datastream.sh file true false" 
"skip_check_exceptions"
+run_test "Running HA (file, sync) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_datastream.sh file false false" 
"skip_check_exceptions"
+run_test "Running HA (rocks, non-incremental) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true false" 
"skip_check_exceptions"
+run_test "Running HA (rocks, incremental) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true true" 
"skip_check_exceptions"
 
 run_test "Resuming Savepoint (file, async, no parallelism change) end-to-end 
test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file true"
 run_test "Resuming Savepoint (file, sync, no parallelism change) end-to-end 
test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file false"
@@ -87,17 +93,17 @@ run_test "Resuming Externalized Checkpoint after terminal 
failure (rocks, non-in
 run_test "Resuming Externalized Checkpoint after terminal failure (rocks, 
incremental) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks 
true true true"
 
 run_test "DataSet allround end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
-run_test "Streaming SQL end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh"
-run_test "Streaming bucketing end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh"
-run_test "Streaming File Sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh"
+run_test "Streaming SQL end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh" "skip_check_exceptions"
+run_test "Streaming bucketing end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh" 
"skip_check_exceptions"
+run_test "Streaming File Sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh" 
"skip_check_exceptions"
 run_test "Stateful stream job upgrade end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
 
-run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file 
false false"
-run_test "Lo

[jira] [Commented] (FLINK-10678) Add a switch to run_test to configure if logs should be checked for errors/excepions

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671369#comment-16671369
 ] 

ASF GitHub Bot commented on FLINK-10678:


dawidwys closed pull request #6959: [FLINK-10678][e2e] Introduced switch to 
disable log checking in e2e tests
URL: https://github.com/apache/flink/pull/6959
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 46931495d27..bbb0e9e5885 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -41,19 +41,25 @@ echo "Flink distribution directory: $FLINK_DIR"
 
 # Template for adding a test:
 
-# run_test "" "$END_TO_END_DIR/test-scripts/"
+# run_test "" "$END_TO_END_DIR/test-scripts/" 
["skip_check_exceptions"]
+
+# IMPORTANT:
+# With the "skip_check_exceptions" flag one can disable default exceptions and 
errors checking in log files. This should be done
+# carefully though. A valid reasons for doing so could be e.g killing TMs 
randomly as we cannot predict what exception could be thrown. Whenever
+# those checks are disabled, one should take care that a proper checks are 
performed in the tests itself that ensure that the test finished
+# in an expected state.
 
 run_test "ConnectedComponents iterations with high parallelism end-to-end 
test" "$END_TO_END_DIR/test-scripts/test_high_parallelism_iterations.sh 25"
 
 run_test "Queryable state (rocksdb) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_queryable_state.sh rocksdb"
-run_test "Queryable state (rocksdb) with TM restart end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh"
+run_test "Queryable state (rocksdb) with TM restart end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" 
"skip_check_exceptions"
 
-run_test "Running HA dataset end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_dataset.sh"
+run_test "Running HA dataset end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_dataset.sh" "skip_check_exceptions"
 
-run_test "Running HA (file, async) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_datastream.sh file true false"
-run_test "Running HA (file, sync) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_datastream.sh file false false"
-run_test "Running HA (rocks, non-incremental) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true false"
-run_test "Running HA (rocks, incremental) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true true"
+run_test "Running HA (file, async) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_datastream.sh file true false" 
"skip_check_exceptions"
+run_test "Running HA (file, sync) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_datastream.sh file false false" 
"skip_check_exceptions"
+run_test "Running HA (rocks, non-incremental) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true false" 
"skip_check_exceptions"
+run_test "Running HA (rocks, incremental) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true true" 
"skip_check_exceptions"
 
 run_test "Resuming Savepoint (file, async, no parallelism change) end-to-end 
test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file true"
 run_test "Resuming Savepoint (file, sync, no parallelism change) end-to-end 
test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file false"
@@ -87,17 +93,17 @@ run_test "Resuming Externalized Checkpoint after terminal 
failure (rocks, non-in
 run_test "Resuming Externalized Checkpoint after terminal failure (rocks, 
incremental) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks 
true true true"
 
 run_test "DataSet allround end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
-run_test "Streaming SQL end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh"
-run_test "Streaming bucketing end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh"
-run_test "Streaming File Sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh"
+run_test "Streaming SQL end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh" "skip_check_exceptions"
+run_test "Streaming bucketing end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh" 
"skip_check_exceptions"
+run_test "Streaming File Sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh" 
"skip_check_exceptions"
 run_test "Stateful stream job upgrade e

[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671395#comment-16671395
 ] 

ASF GitHub Bot commented on FLINK-10600:


pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r229993379
 
 

 ##
 File path: flink-end-to-end-tests/flink-kafka-test/pom.xml
 ##
 @@ -0,0 +1,91 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.7-SNAPSHOT
+   
+   4.0.0
+
+   flink-kafka-test
+   flink-kafka-test
+
+   
+   
+   org.apache.flink
+   flink-kafka-test-base
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-base_${scala.binary.version}
 
 Review comment:
   What was the error? We should definitely look into it, since we shouldn't 
expect users to use the same hack in their poms.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

2018-11-01 Thread GitBox
pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r229993379
 
 

 ##
 File path: flink-end-to-end-tests/flink-kafka-test/pom.xml
 ##
 @@ -0,0 +1,91 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.7-SNAPSHOT
+   
+   4.0.0
+
+   flink-kafka-test
+   flink-kafka-test
+
+   
+   
+   org.apache.flink
+   flink-kafka-test-base
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-base_${scala.binary.version}
 
 Review comment:
   What was the error? We should definitely look into it, since we shouldn't 
expect users to use the same hack in their poms.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10493) Macro generated CaseClassSerializer considered harmful

2018-11-01 Thread JC (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671403#comment-16671403
 ] 

JC commented on FLINK-10493:


I came across this ticket while researching the following exception i got today 
when redeploying a pipeline from a savepoint on flink 1.5.4, is it related ?
{code:java}
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for DoFnOperator_165cb4197f62583ea9e72c0199dcdb4f_(1/2) from any 
of the 1 provided restore options.
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:240)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
    ... 5 more
Caused by: java.io.IOException: Unable to restore operator state 
[pushed-back-elements]. The previous serializer of the operator state must be 
present; the serializer could have been removed from the classpath, or its 
implementation have changed and could not be loaded. This is a temporary 
restriction that will be fixed in future versions.{code}

> Macro generated CaseClassSerializer considered harmful
> --
>
> Key: FLINK-10493
> URL: https://issues.apache.org/jira/browse/FLINK-10493
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, State Backends, Checkpointing, Type 
> Serialization System
>Affects Versions: 1.4.0, 1.4.1, 1.4.2, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 
> 1.6.1
>Reporter: Elias Levy
>Priority: Major
>
> The Flink Scala API uses implicits and macros to generate {{TypeInformation}} 
> and {{TypeSerializer}} objects for types.  In the case of Scala tuple and 
> case classes, the macro generates an [anonymous {{CaseClassSerializer}} 
> class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161].
>   
> The Scala compiler will generate a name for the anonymous class that depends 
> on the relative position in the code of the macro invocation to other 
> anonymous classes.  If the code is changed such that the anonymous class 
> relative position changes, even if the overall logic of the code or the type 
> in question do not change, the name of the serializer class will change.
> That will result in errors, such as the one below, if the job is restored 
> from a savepoint, as the serializer to read the data in the savepoint will no 
> longer be found, as its name will have changed.
> At the very least, there should be a prominent warning in the documentation 
> about this issue.  Minor code changes can result in jobs that can't restore 
> previous state.  Ideally, the use of anonymous classes should be deprecated 
> if possible.
> {noformat}
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>   at 
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.

[GitHub] azagrebin commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] Enable mutual REST SSL auth in e2e tests

2018-11-01 Thread GitBox
azagrebin commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229995108
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_ssl.sh
 ##
 @@ -0,0 +1,98 @@
+#!/usr/bin/env bash
+
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+function _set_conf_ssl_helper {
+local type=$1 # 'internal' or external 'rest'
+local ssl_dir="${TEST_DATA_DIR}/ssl/${type}"
+local password="${type}.password"
+
+if [ "${type}" != "internal" ] && [ "${type}" != "rest" ]; then
+echo "Unknown type of ssl connectivity: ${type}. It can be either 
'internal' or external 'rest'"
+exit 1
+fi
+
+# clean up the dir that will be used for SSL certificates and trust stores
+if [ -e "${ssl_dir}" ]; then
+   echo "File ${ssl_dir} exists. Deleting it..."
+   rm -rf "${ssl_dir}"
+fi
+mkdir -p "${ssl_dir}"
+
+SANSTRING="dns:${NODENAME}"
+for NODEIP in $(get_node_ip) ; do
+SANSTRING="${SANSTRING},ip:${NODEIP}"
+done
+
+echo "Using SAN ${SANSTRING}"
+
+# create certificates
+keytool -genkeypair -alias ca -keystore "${ssl_dir}/ca.keystore" -dname 
"CN=Sample CA" -storepass ${password} -keypass ${password} -keyalg RSA -ext 
bc=ca:true -storetype PKCS12
+keytool -keystore "${ssl_dir}/ca.keystore" -storepass ${password} -alias 
ca -exportcert > "${ssl_dir}/ca.cer"
+keytool -importcert -keystore "${ssl_dir}/ca.truststore" -alias ca 
-storepass ${password} -noprompt -file "${ssl_dir}/ca.cer"
+
+keytool -genkeypair -alias node -keystore "${ssl_dir}/node.keystore" 
-dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass ${password} -keypass 
${password} -keyalg RSA -storetype PKCS12
+keytool -certreq -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -alias node -file "${ssl_dir}/node.csr"
+keytool -gencert -keystore "${ssl_dir}/ca.keystore" -storepass ${password} 
-alias ca -ext SAN=${SANSTRING} -infile "${ssl_dir}/node.csr" -outfile 
"${ssl_dir}/node.cer"
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/ca.cer" -alias ca -noprompt
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/node.cer" -alias node -noprompt
+
+openssl pkcs12 -passin pass:${password} -in "${ssl_dir}/node.keystore" 
-passout pass: -out "${ssl_dir}/node.pem" -nodes
 
 Review comment:
   keystore is converted into the PEM format to use it with curl, I will add a 
comment for that


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] Enable mutual REST SSL auth in e2e tests

2018-11-01 Thread GitBox
azagrebin commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229995229
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_ssl.sh
 ##
 @@ -0,0 +1,98 @@
+#!/usr/bin/env bash
+
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+function _set_conf_ssl_helper {
+local type=$1 # 'internal' or external 'rest'
+local ssl_dir="${TEST_DATA_DIR}/ssl/${type}"
+local password="${type}.password"
+
+if [ "${type}" != "internal" ] && [ "${type}" != "rest" ]; then
+echo "Unknown type of ssl connectivity: ${type}. It can be either 
'internal' or external 'rest'"
+exit 1
+fi
+
+# clean up the dir that will be used for SSL certificates and trust stores
+if [ -e "${ssl_dir}" ]; then
+   echo "File ${ssl_dir} exists. Deleting it..."
+   rm -rf "${ssl_dir}"
+fi
+mkdir -p "${ssl_dir}"
+
+SANSTRING="dns:${NODENAME}"
+for NODEIP in $(get_node_ip) ; do
+SANSTRING="${SANSTRING},ip:${NODEIP}"
+done
+
+echo "Using SAN ${SANSTRING}"
+
+# create certificates
+keytool -genkeypair -alias ca -keystore "${ssl_dir}/ca.keystore" -dname 
"CN=Sample CA" -storepass ${password} -keypass ${password} -keyalg RSA -ext 
bc=ca:true -storetype PKCS12
+keytool -keystore "${ssl_dir}/ca.keystore" -storepass ${password} -alias 
ca -exportcert > "${ssl_dir}/ca.cer"
+keytool -importcert -keystore "${ssl_dir}/ca.truststore" -alias ca 
-storepass ${password} -noprompt -file "${ssl_dir}/ca.cer"
+
+keytool -genkeypair -alias node -keystore "${ssl_dir}/node.keystore" 
-dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass ${password} -keypass 
${password} -keyalg RSA -storetype PKCS12
+keytool -certreq -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -alias node -file "${ssl_dir}/node.csr"
+keytool -gencert -keystore "${ssl_dir}/ca.keystore" -storepass ${password} 
-alias ca -ext SAN=${SANSTRING} -infile "${ssl_dir}/node.csr" -outfile 
"${ssl_dir}/node.cer"
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/ca.cer" -alias ca -noprompt
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/node.cer" -alias node -noprompt
+
+openssl pkcs12 -passin pass:${password} -in "${ssl_dir}/node.keystore" 
-passout pass: -out "${ssl_dir}/node.pem" -nodes
+
+# adapt config
+# (here we rely on security.ssl.enabled enabling SSL for all components 
and internal as well as
+# external communication channels)
+set_conf security.ssl.${type}.enabled true
+set_conf security.ssl.${type}.keystore ${ssl_dir}/node.keystore
+set_conf security.ssl.${type}.keystore-password ${password}
+set_conf security.ssl.${type}.key-password ${password}
+set_conf security.ssl.${type}.truststore ${ssl_dir}/ca.truststore
+set_conf security.ssl.${type}.truststore-password ${password}
+}
+
+function set_conf_internal_ssl {
+_set_conf_ssl_helper "internal"
+}
+
+function set_conf_mutual_rest_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
+local mutual="false"
+local ssl_dir="${TEST_DATA_DIR}/ssl/rest"
+if [ "${auth}" == "mutual" ]; then
+CURL_SSL_ARGS="${CURL_SSL_ARGS} --cert ${ssl_dir}/node.pem"
+mutual="true";
+fi
+echo "Mutual ssl auth: ${mutual}"
+set_conf security.ssl.rest.authentication-enabled ${mutual}
+}
+
+function set_conf_rest_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
+local ssl_dir="${TEST_DATA_DIR}/ssl/rest"
+_set_conf_ssl_helper "rest"
+set_conf_mutual_rest_ssl ${auth}
+REST_PROTOCOL="https"
+CURL_SSL_ARGS="${CURL_SSL_ARGS} --cacert ${ssl_dir}/node.pem"
+}
+
+function set_conf_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
 
 Review comment:
   I will modify batch all around to use only server version


Thi

[jira] [Commented] (FLINK-10628) Add end-to-end test for REST communication encryption

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671409#comment-16671409
 ] 

ASF GitHub Bot commented on FLINK-10628:


azagrebin commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229995108
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_ssl.sh
 ##
 @@ -0,0 +1,98 @@
+#!/usr/bin/env bash
+
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+function _set_conf_ssl_helper {
+local type=$1 # 'internal' or external 'rest'
+local ssl_dir="${TEST_DATA_DIR}/ssl/${type}"
+local password="${type}.password"
+
+if [ "${type}" != "internal" ] && [ "${type}" != "rest" ]; then
+echo "Unknown type of ssl connectivity: ${type}. It can be either 
'internal' or external 'rest'"
+exit 1
+fi
+
+# clean up the dir that will be used for SSL certificates and trust stores
+if [ -e "${ssl_dir}" ]; then
+   echo "File ${ssl_dir} exists. Deleting it..."
+   rm -rf "${ssl_dir}"
+fi
+mkdir -p "${ssl_dir}"
+
+SANSTRING="dns:${NODENAME}"
+for NODEIP in $(get_node_ip) ; do
+SANSTRING="${SANSTRING},ip:${NODEIP}"
+done
+
+echo "Using SAN ${SANSTRING}"
+
+# create certificates
+keytool -genkeypair -alias ca -keystore "${ssl_dir}/ca.keystore" -dname 
"CN=Sample CA" -storepass ${password} -keypass ${password} -keyalg RSA -ext 
bc=ca:true -storetype PKCS12
+keytool -keystore "${ssl_dir}/ca.keystore" -storepass ${password} -alias 
ca -exportcert > "${ssl_dir}/ca.cer"
+keytool -importcert -keystore "${ssl_dir}/ca.truststore" -alias ca 
-storepass ${password} -noprompt -file "${ssl_dir}/ca.cer"
+
+keytool -genkeypair -alias node -keystore "${ssl_dir}/node.keystore" 
-dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass ${password} -keypass 
${password} -keyalg RSA -storetype PKCS12
+keytool -certreq -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -alias node -file "${ssl_dir}/node.csr"
+keytool -gencert -keystore "${ssl_dir}/ca.keystore" -storepass ${password} 
-alias ca -ext SAN=${SANSTRING} -infile "${ssl_dir}/node.csr" -outfile 
"${ssl_dir}/node.cer"
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/ca.cer" -alias ca -noprompt
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/node.cer" -alias node -noprompt
+
+openssl pkcs12 -passin pass:${password} -in "${ssl_dir}/node.keystore" 
-passout pass: -out "${ssl_dir}/node.pem" -nodes
 
 Review comment:
   keystore is converted into the PEM format to use it with curl, I will add a 
comment for that


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add end-to-end test for REST communication encryption
> -
>
> Key: FLINK-10628
> URL: https://issues.apache.org/jira/browse/FLINK-10628
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-10371 we added support for mutual authentication for the REST 
> communication. We should adapt one of the existing end-to-end tests to 
> require this feature for the REST communication.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10628) Add end-to-end test for REST communication encryption

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671411#comment-16671411
 ] 

ASF GitHub Bot commented on FLINK-10628:


azagrebin commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229995229
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_ssl.sh
 ##
 @@ -0,0 +1,98 @@
+#!/usr/bin/env bash
+
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+function _set_conf_ssl_helper {
+local type=$1 # 'internal' or external 'rest'
+local ssl_dir="${TEST_DATA_DIR}/ssl/${type}"
+local password="${type}.password"
+
+if [ "${type}" != "internal" ] && [ "${type}" != "rest" ]; then
+echo "Unknown type of ssl connectivity: ${type}. It can be either 
'internal' or external 'rest'"
+exit 1
+fi
+
+# clean up the dir that will be used for SSL certificates and trust stores
+if [ -e "${ssl_dir}" ]; then
+   echo "File ${ssl_dir} exists. Deleting it..."
+   rm -rf "${ssl_dir}"
+fi
+mkdir -p "${ssl_dir}"
+
+SANSTRING="dns:${NODENAME}"
+for NODEIP in $(get_node_ip) ; do
+SANSTRING="${SANSTRING},ip:${NODEIP}"
+done
+
+echo "Using SAN ${SANSTRING}"
+
+# create certificates
+keytool -genkeypair -alias ca -keystore "${ssl_dir}/ca.keystore" -dname 
"CN=Sample CA" -storepass ${password} -keypass ${password} -keyalg RSA -ext 
bc=ca:true -storetype PKCS12
+keytool -keystore "${ssl_dir}/ca.keystore" -storepass ${password} -alias 
ca -exportcert > "${ssl_dir}/ca.cer"
+keytool -importcert -keystore "${ssl_dir}/ca.truststore" -alias ca 
-storepass ${password} -noprompt -file "${ssl_dir}/ca.cer"
+
+keytool -genkeypair -alias node -keystore "${ssl_dir}/node.keystore" 
-dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass ${password} -keypass 
${password} -keyalg RSA -storetype PKCS12
+keytool -certreq -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -alias node -file "${ssl_dir}/node.csr"
+keytool -gencert -keystore "${ssl_dir}/ca.keystore" -storepass ${password} 
-alias ca -ext SAN=${SANSTRING} -infile "${ssl_dir}/node.csr" -outfile 
"${ssl_dir}/node.cer"
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/ca.cer" -alias ca -noprompt
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/node.cer" -alias node -noprompt
+
+openssl pkcs12 -passin pass:${password} -in "${ssl_dir}/node.keystore" 
-passout pass: -out "${ssl_dir}/node.pem" -nodes
+
+# adapt config
+# (here we rely on security.ssl.enabled enabling SSL for all components 
and internal as well as
+# external communication channels)
+set_conf security.ssl.${type}.enabled true
+set_conf security.ssl.${type}.keystore ${ssl_dir}/node.keystore
+set_conf security.ssl.${type}.keystore-password ${password}
+set_conf security.ssl.${type}.key-password ${password}
+set_conf security.ssl.${type}.truststore ${ssl_dir}/ca.truststore
+set_conf security.ssl.${type}.truststore-password ${password}
+}
+
+function set_conf_internal_ssl {
+_set_conf_ssl_helper "internal"
+}
+
+function set_conf_mutual_rest_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
+local mutual="false"
+local ssl_dir="${TEST_DATA_DIR}/ssl/rest"
+if [ "${auth}" == "mutual" ]; then
+CURL_SSL_ARGS="${CURL_SSL_ARGS} --cert ${ssl_dir}/node.pem"
+mutual="true";
+fi
+echo "Mutual ssl auth: ${mutual}"
+set_conf security.ssl.rest.authentication-enabled ${mutual}
+}
+
+function set_conf_rest_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
+local ssl_dir="${TEST_DATA_DIR}/ssl/rest"
+_set_conf_ssl_helper "rest"
+set_conf_mutual_rest_ssl ${auth}
+REST_PROTOCOL="https"
+CURL_SSL_ARGS="${CURL_SSL_ARGS} --cacert

[GitHub] azagrebin commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] Enable mutual REST SSL auth in e2e tests

2018-11-01 Thread GitBox
azagrebin commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229995637
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_ssl.sh
 ##
 @@ -0,0 +1,98 @@
+#!/usr/bin/env bash
+
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+function _set_conf_ssl_helper {
+local type=$1 # 'internal' or external 'rest'
+local ssl_dir="${TEST_DATA_DIR}/ssl/${type}"
+local password="${type}.password"
+
+if [ "${type}" != "internal" ] && [ "${type}" != "rest" ]; then
+echo "Unknown type of ssl connectivity: ${type}. It can be either 
'internal' or external 'rest'"
+exit 1
+fi
+
+# clean up the dir that will be used for SSL certificates and trust stores
+if [ -e "${ssl_dir}" ]; then
+   echo "File ${ssl_dir} exists. Deleting it..."
+   rm -rf "${ssl_dir}"
+fi
+mkdir -p "${ssl_dir}"
+
+SANSTRING="dns:${NODENAME}"
+for NODEIP in $(get_node_ip) ; do
+SANSTRING="${SANSTRING},ip:${NODEIP}"
+done
+
+echo "Using SAN ${SANSTRING}"
+
+# create certificates
+keytool -genkeypair -alias ca -keystore "${ssl_dir}/ca.keystore" -dname 
"CN=Sample CA" -storepass ${password} -keypass ${password} -keyalg RSA -ext 
bc=ca:true -storetype PKCS12
+keytool -keystore "${ssl_dir}/ca.keystore" -storepass ${password} -alias 
ca -exportcert > "${ssl_dir}/ca.cer"
+keytool -importcert -keystore "${ssl_dir}/ca.truststore" -alias ca 
-storepass ${password} -noprompt -file "${ssl_dir}/ca.cer"
+
+keytool -genkeypair -alias node -keystore "${ssl_dir}/node.keystore" 
-dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass ${password} -keypass 
${password} -keyalg RSA -storetype PKCS12
+keytool -certreq -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -alias node -file "${ssl_dir}/node.csr"
+keytool -gencert -keystore "${ssl_dir}/ca.keystore" -storepass ${password} 
-alias ca -ext SAN=${SANSTRING} -infile "${ssl_dir}/node.csr" -outfile 
"${ssl_dir}/node.cer"
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/ca.cer" -alias ca -noprompt
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/node.cer" -alias node -noprompt
+
+openssl pkcs12 -passin pass:${password} -in "${ssl_dir}/node.keystore" 
-passout pass: -out "${ssl_dir}/node.pem" -nodes
+
+# adapt config
+# (here we rely on security.ssl.enabled enabling SSL for all components 
and internal as well as
+# external communication channels)
+set_conf security.ssl.${type}.enabled true
+set_conf security.ssl.${type}.keystore ${ssl_dir}/node.keystore
+set_conf security.ssl.${type}.keystore-password ${password}
+set_conf security.ssl.${type}.key-password ${password}
+set_conf security.ssl.${type}.truststore ${ssl_dir}/ca.truststore
+set_conf security.ssl.${type}.truststore-password ${password}
+}
+
+function set_conf_internal_ssl {
+_set_conf_ssl_helper "internal"
+}
+
+function set_conf_mutual_rest_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
+local mutual="false"
+local ssl_dir="${TEST_DATA_DIR}/ssl/rest"
+if [ "${auth}" == "mutual" ]; then
+CURL_SSL_ARGS="${CURL_SSL_ARGS} --cert ${ssl_dir}/node.pem"
+mutual="true";
+fi
+echo "Mutual ssl auth: ${mutual}"
+set_conf security.ssl.rest.authentication-enabled ${mutual}
+}
+
+function set_conf_rest_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
+local ssl_dir="${TEST_DATA_DIR}/ssl/rest"
+_set_conf_ssl_helper "rest"
+set_conf_mutual_rest_ssl ${auth}
+REST_PROTOCOL="https"
+CURL_SSL_ARGS="${CURL_SSL_ARGS} --cacert ${ssl_dir}/node.pem"
 
 Review comment:
   ok, I will source `common_ssl.sh` in `common.sh` close to the modified vars 
and add a comment. I still want to have SSL modularised to reduce the huge 
`common.sh` file.


[jira] [Commented] (FLINK-10628) Add end-to-end test for REST communication encryption

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671414#comment-16671414
 ] 

ASF GitHub Bot commented on FLINK-10628:


azagrebin commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229995637
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_ssl.sh
 ##
 @@ -0,0 +1,98 @@
+#!/usr/bin/env bash
+
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+function _set_conf_ssl_helper {
+local type=$1 # 'internal' or external 'rest'
+local ssl_dir="${TEST_DATA_DIR}/ssl/${type}"
+local password="${type}.password"
+
+if [ "${type}" != "internal" ] && [ "${type}" != "rest" ]; then
+echo "Unknown type of ssl connectivity: ${type}. It can be either 
'internal' or external 'rest'"
+exit 1
+fi
+
+# clean up the dir that will be used for SSL certificates and trust stores
+if [ -e "${ssl_dir}" ]; then
+   echo "File ${ssl_dir} exists. Deleting it..."
+   rm -rf "${ssl_dir}"
+fi
+mkdir -p "${ssl_dir}"
+
+SANSTRING="dns:${NODENAME}"
+for NODEIP in $(get_node_ip) ; do
+SANSTRING="${SANSTRING},ip:${NODEIP}"
+done
+
+echo "Using SAN ${SANSTRING}"
+
+# create certificates
+keytool -genkeypair -alias ca -keystore "${ssl_dir}/ca.keystore" -dname 
"CN=Sample CA" -storepass ${password} -keypass ${password} -keyalg RSA -ext 
bc=ca:true -storetype PKCS12
+keytool -keystore "${ssl_dir}/ca.keystore" -storepass ${password} -alias 
ca -exportcert > "${ssl_dir}/ca.cer"
+keytool -importcert -keystore "${ssl_dir}/ca.truststore" -alias ca 
-storepass ${password} -noprompt -file "${ssl_dir}/ca.cer"
+
+keytool -genkeypair -alias node -keystore "${ssl_dir}/node.keystore" 
-dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass ${password} -keypass 
${password} -keyalg RSA -storetype PKCS12
+keytool -certreq -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -alias node -file "${ssl_dir}/node.csr"
+keytool -gencert -keystore "${ssl_dir}/ca.keystore" -storepass ${password} 
-alias ca -ext SAN=${SANSTRING} -infile "${ssl_dir}/node.csr" -outfile 
"${ssl_dir}/node.cer"
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/ca.cer" -alias ca -noprompt
+keytool -importcert -keystore "${ssl_dir}/node.keystore" -storepass 
${password} -file "${ssl_dir}/node.cer" -alias node -noprompt
+
+openssl pkcs12 -passin pass:${password} -in "${ssl_dir}/node.keystore" 
-passout pass: -out "${ssl_dir}/node.pem" -nodes
+
+# adapt config
+# (here we rely on security.ssl.enabled enabling SSL for all components 
and internal as well as
+# external communication channels)
+set_conf security.ssl.${type}.enabled true
+set_conf security.ssl.${type}.keystore ${ssl_dir}/node.keystore
+set_conf security.ssl.${type}.keystore-password ${password}
+set_conf security.ssl.${type}.key-password ${password}
+set_conf security.ssl.${type}.truststore ${ssl_dir}/ca.truststore
+set_conf security.ssl.${type}.truststore-password ${password}
+}
+
+function set_conf_internal_ssl {
+_set_conf_ssl_helper "internal"
+}
+
+function set_conf_mutual_rest_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
+local mutual="false"
+local ssl_dir="${TEST_DATA_DIR}/ssl/rest"
+if [ "${auth}" == "mutual" ]; then
+CURL_SSL_ARGS="${CURL_SSL_ARGS} --cert ${ssl_dir}/node.pem"
+mutual="true";
+fi
+echo "Mutual ssl auth: ${mutual}"
+set_conf security.ssl.rest.authentication-enabled ${mutual}
+}
+
+function set_conf_rest_ssl {
+local auth="${1:-server}" # only 'server' or 'mutual'
+local ssl_dir="${TEST_DATA_DIR}/ssl/rest"
+_set_conf_ssl_helper "rest"
+set_conf_mutual_rest_ssl ${auth}
+REST_PROTOCOL="https"
+CURL_SSL_ARGS="${CURL_SSL_ARGS} --cacert

[GitHub] zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] Enable mutual REST SSL auth in e2e tests

2018-11-01 Thread GitBox
zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229997184
 
 

 ##
 File path: docs/ops/security-ssl.md
 ##
 @@ -252,6 +252,25 @@ security.ssl.rest.key-password: rest_key_password
 security.ssl.rest.truststore-password: ca_truststore_password
 {% endhighlight %}
 
+**Tips to query REST Endpoint with curl utility**
+
+You can generate convert the keystore into the `PEM` format using `openssl`:
 
 Review comment:
   remove `generate`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] Enable mutual REST SSL auth in e2e tests

2018-11-01 Thread GitBox
zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229997416
 
 

 ##
 File path: docs/ops/security-ssl.md
 ##
 @@ -252,6 +252,25 @@ security.ssl.rest.key-password: rest_key_password
 security.ssl.rest.truststore-password: ca_truststore_password
 {% endhighlight %}
 
+**Tips to query REST Endpoint with curl utility**
+
+You can generate convert the keystore into the `PEM` format using `openssl`:
+
+{% highlight bash %}
+openssl pkcs12 -passin pass:rest_keystore_password -in rest.keystore -out 
rest.pem -nodes
+{% endhighlight %}
+
+Then you can query REST Endpoint with `curl`:
+
+{% highlight bash %}
+curl --cacert rest.pem flink_url
+{% endhighlight %}
+
+If mutual SSL is enabled:
+
+{% highlight bash %}
+curl --cacert rest.pem  --cert rest.pem flink_url
 
 Review comment:
   remove second space after `--cacert rest.pem`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10628) Add end-to-end test for REST communication encryption

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671432#comment-16671432
 ] 

ASF GitHub Bot commented on FLINK-10628:


zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229997184
 
 

 ##
 File path: docs/ops/security-ssl.md
 ##
 @@ -252,6 +252,25 @@ security.ssl.rest.key-password: rest_key_password
 security.ssl.rest.truststore-password: ca_truststore_password
 {% endhighlight %}
 
+**Tips to query REST Endpoint with curl utility**
+
+You can generate convert the keystore into the `PEM` format using `openssl`:
 
 Review comment:
   remove `generate`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add end-to-end test for REST communication encryption
> -
>
> Key: FLINK-10628
> URL: https://issues.apache.org/jira/browse/FLINK-10628
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-10371 we added support for mutual authentication for the REST 
> communication. We should adapt one of the existing end-to-end tests to 
> require this feature for the REST communication.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10628) Add end-to-end test for REST communication encryption

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671431#comment-16671431
 ] 

ASF GitHub Bot commented on FLINK-10628:


zentol commented on a change in pull request #6967: [FLINK-10628][E2E][SSL] 
Enable mutual REST SSL auth in e2e tests
URL: https://github.com/apache/flink/pull/6967#discussion_r229997416
 
 

 ##
 File path: docs/ops/security-ssl.md
 ##
 @@ -252,6 +252,25 @@ security.ssl.rest.key-password: rest_key_password
 security.ssl.rest.truststore-password: ca_truststore_password
 {% endhighlight %}
 
+**Tips to query REST Endpoint with curl utility**
+
+You can generate convert the keystore into the `PEM` format using `openssl`:
+
+{% highlight bash %}
+openssl pkcs12 -passin pass:rest_keystore_password -in rest.keystore -out 
rest.pem -nodes
+{% endhighlight %}
+
+Then you can query REST Endpoint with `curl`:
+
+{% highlight bash %}
+curl --cacert rest.pem flink_url
+{% endhighlight %}
+
+If mutual SSL is enabled:
+
+{% highlight bash %}
+curl --cacert rest.pem  --cert rest.pem flink_url
 
 Review comment:
   remove second space after `--cacert rest.pem`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add end-to-end test for REST communication encryption
> -
>
> Key: FLINK-10628
> URL: https://issues.apache.org/jira/browse/FLINK-10628
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-10371 we added support for mutual authentication for the REST 
> communication. We should adapt one of the existing end-to-end tests to 
> require this feature for the REST communication.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann closed pull request #6972: [FLINK-9635][scheduling] Avoid task spread-out in scheduling with loc…

2018-11-01 Thread GitBox
tillrohrmann closed pull request #6972: [FLINK-9635][scheduling] Avoid task 
spread-out in scheduling with loc…
URL: https://github.com/apache/flink/pull/6972
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
index 7cb364d687f..ab682a02dd1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
@@ -25,6 +25,7 @@
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Set;
 
 /**
  * A slot profile describes the profile of a slot into which a task wants to 
be scheduled. The profile contains
@@ -47,16 +48,30 @@
 
/** This contains desired allocation ids of the slot. */
@Nonnull
-   private final Collection priorAllocations;
+   private final Collection preferredAllocations;
+
+   /** This contains all prior allocation ids from the whole execution 
graph. */
+   @Nonnull
+   private final Set previousExecutionGraphAllocations;
+
+   public SlotProfile(
+   @Nonnull ResourceProfile resourceProfile,
+   @Nonnull Collection preferredLocations,
+   @Nonnull Collection preferredAllocations) {
+
+   this(resourceProfile, preferredLocations, preferredAllocations, 
Collections.emptySet());
+   }
 
public SlotProfile(
@Nonnull ResourceProfile resourceProfile,
@Nonnull Collection preferredLocations,
-   @Nonnull Collection priorAllocations) {
+   @Nonnull Collection preferredAllocations,
+   @Nonnull Set previousExecutionGraphAllocations) {
 
this.resourceProfile = resourceProfile;
this.preferredLocations = preferredLocations;
-   this.priorAllocations = priorAllocations;
+   this.preferredAllocations = preferredAllocations;
+   this.previousExecutionGraphAllocations = 
previousExecutionGraphAllocations;
}
 
/**
@@ -79,8 +94,18 @@ public ResourceProfile getResourceProfile() {
 * Returns the desired allocation ids for the slot.
 */
@Nonnull
-   public Collection getPriorAllocations() {
-   return priorAllocations;
+   public Collection getPreferredAllocations() {
+   return preferredAllocations;
+   }
+
+   /**
+* Returns a set of all previous allocation ids from the execution 
graph.
+*
+* This is optional and can be empty if unused.
+*/
+   @Nonnull
+   public Set getPreviousExecutionGraphAllocations() {
+   return previousExecutionGraphAllocations;
}
 
/**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 801f35a41dc..cbf51037b8b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -58,6 +58,7 @@
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -65,6 +66,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -385,7 +387,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore 
taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   Collections.emptySet());
}
 
/**
@@ -397,18 +400,22 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
 * @param queued Flag to indicate whether the scheduler may queue this 
task if it cannot
 *   immediately deploy it.
 * @param locationPreferenceConstraint constraint for the location 
preferences
+* @param allPreviousExecutionGraphAllocationIds set with all previous 
allocation ids in the job graph.
+* Can be empty if the 
allocation ids are not required for scheduling.
 * @return Fu

[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671441#comment-16671441
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann closed pull request #6972: [FLINK-9635][scheduling] Avoid task 
spread-out in scheduling with loc…
URL: https://github.com/apache/flink/pull/6972
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
index 7cb364d687f..ab682a02dd1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
@@ -25,6 +25,7 @@
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Set;
 
 /**
  * A slot profile describes the profile of a slot into which a task wants to 
be scheduled. The profile contains
@@ -47,16 +48,30 @@
 
/** This contains desired allocation ids of the slot. */
@Nonnull
-   private final Collection priorAllocations;
+   private final Collection preferredAllocations;
+
+   /** This contains all prior allocation ids from the whole execution 
graph. */
+   @Nonnull
+   private final Set previousExecutionGraphAllocations;
+
+   public SlotProfile(
+   @Nonnull ResourceProfile resourceProfile,
+   @Nonnull Collection preferredLocations,
+   @Nonnull Collection preferredAllocations) {
+
+   this(resourceProfile, preferredLocations, preferredAllocations, 
Collections.emptySet());
+   }
 
public SlotProfile(
@Nonnull ResourceProfile resourceProfile,
@Nonnull Collection preferredLocations,
-   @Nonnull Collection priorAllocations) {
+   @Nonnull Collection preferredAllocations,
+   @Nonnull Set previousExecutionGraphAllocations) {
 
this.resourceProfile = resourceProfile;
this.preferredLocations = preferredLocations;
-   this.priorAllocations = priorAllocations;
+   this.preferredAllocations = preferredAllocations;
+   this.previousExecutionGraphAllocations = 
previousExecutionGraphAllocations;
}
 
/**
@@ -79,8 +94,18 @@ public ResourceProfile getResourceProfile() {
 * Returns the desired allocation ids for the slot.
 */
@Nonnull
-   public Collection getPriorAllocations() {
-   return priorAllocations;
+   public Collection getPreferredAllocations() {
+   return preferredAllocations;
+   }
+
+   /**
+* Returns a set of all previous allocation ids from the execution 
graph.
+*
+* This is optional and can be empty if unused.
+*/
+   @Nonnull
+   public Set getPreviousExecutionGraphAllocations() {
+   return previousExecutionGraphAllocations;
}
 
/**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 801f35a41dc..cbf51037b8b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -58,6 +58,7 @@
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -65,6 +66,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -385,7 +387,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore 
taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   Collections.emptySet());
}
 
/**
@@ -397,18 +400,22 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
 * @param queued Flag to indicate whether the scheduler may queue this 
task if it cannot
 *   immediately deploy it.
 * @param locationPreferenceConstraint constraint for the location 
preferences
+

[GitHub] StefanRRichter commented on issue #6972: [FLINK-9635][scheduling] Avoid task spread-out in scheduling with loc…

2018-11-01 Thread GitBox
StefanRRichter commented on issue #6972: [FLINK-9635][scheduling] Avoid task 
spread-out in scheduling with loc…
URL: https://github.com/apache/flink/pull/6972#issuecomment-435000698
 
 
   Thanks @tillrohrmann !


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671443#comment-16671443
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

StefanRRichter commented on issue #6972: [FLINK-9635][scheduling] Avoid task 
spread-out in scheduling with loc…
URL: https://github.com/apache/flink/pull/6972#issuecomment-435000698
 
 
   Thanks @tillrohrmann !


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-11-01 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9635.

   Resolution: Fixed
Fix Version/s: 1.6.3

Fixed in 1.6.3 via 
https://github.com/apache/flink/commit/04df02b4728d40b59417ccc8ee281ab3298b09da

> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-11-01 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reopened FLINK-9635:
--

> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10746) Need to replace transfer.sh for Travis log upload because it shuts down

2018-11-01 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10746:
-

 Summary: Need to replace transfer.sh for Travis log upload because 
it shuts down
 Key: FLINK-10746
 URL: https://issues.apache.org/jira/browse/FLINK-10746
 Project: Flink
  Issue Type: Task
  Components: Build System
Affects Versions: 1.7.0
Reporter: Till Rohrmann
 Fix For: 1.8.0


We need to replace {{transfer.sh}} as the destination for our Travis log upload 
because it is about to shut down (see https://transfer.sh/).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-10097) More tests to increase StreamingFileSink test coverage

2018-11-01 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reopened FLINK-10097:


> More tests to increase StreamingFileSink test coverage
> --
>
> Key: FLINK-10097
> URL: https://issues.apache.org/jira/browse/FLINK-10097
> Project: Flink
>  Issue Type: Sub-task
>  Components: filesystem-connector
>Affects Versions: 1.6.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10097) More tests to increase StreamingFileSink test coverage

2018-11-01 Thread Kostas Kloudas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671318#comment-16671318
 ] 

Kostas Kloudas edited comment on FLINK-10097 at 11/1/18 10:55 AM:
--

Merged on master with 9ad932c52056d0ed7e96e1c385403a529e6a8edb
and on 1.6 with b01080172735a7e9cac23a0d133bccf5c6945272


was (Author: kkl0u):
Merged on master with 9ad932c52056d0ed7e96e1c385403a529e6a8edb

> More tests to increase StreamingFileSink test coverage
> --
>
> Key: FLINK-10097
> URL: https://issues.apache.org/jira/browse/FLINK-10097
> Project: Flink
>  Issue Type: Sub-task
>  Components: filesystem-connector
>Affects Versions: 1.6.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10097) More tests to increase StreamingFileSink test coverage

2018-11-01 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-10097:
---
Fix Version/s: 1.6.3

> More tests to increase StreamingFileSink test coverage
> --
>
> Key: FLINK-10097
> URL: https://issues.apache.org/jira/browse/FLINK-10097
> Project: Flink
>  Issue Type: Sub-task
>  Components: filesystem-connector
>Affects Versions: 1.6.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10746) Need to replace transfer.sh for Travis log upload because it shuts down

2018-11-01 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671461#comment-16671461
 ] 

Chesnay Schepler commented on FLINK-10746:
--

I would suggest to rework this ticket to be only about removing the usage, and 
create a separate issue for finding a replacement.

> Need to replace transfer.sh for Travis log upload because it shuts down
> ---
>
> Key: FLINK-10746
> URL: https://issues.apache.org/jira/browse/FLINK-10746
> Project: Flink
>  Issue Type: Task
>  Components: Build System
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.8.0
>
>
> We need to replace {{transfer.sh}} as the destination for our Travis log 
> upload because it is about to shut down (see https://transfer.sh/).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10097) More tests to increase StreamingFileSink test coverage

2018-11-01 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-10097.
--
Resolution: Fixed

> More tests to increase StreamingFileSink test coverage
> --
>
> Key: FLINK-10097
> URL: https://issues.apache.org/jira/browse/FLINK-10097
> Project: Flink
>  Issue Type: Sub-task
>  Components: filesystem-connector
>Affects Versions: 1.6.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10748) Jobmanager in HA setup redirects to ip address instead of hostname

2018-11-01 Thread Jeroen Steggink (JIRA)
Jeroen Steggink created FLINK-10748:
---

 Summary: Jobmanager in HA setup redirects to ip address instead of 
hostname
 Key: FLINK-10748
 URL: https://issues.apache.org/jira/browse/FLINK-10748
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.6.2, 1.5.5, 1.4.2, 1.3.3
Reporter: Jeroen Steggink


In a HA Jobmanager setup, connecting to a follower redirects (HTTP/1.1 307 
Temporary Redirect) to the leader. However, it redirects  to an ip address 
instead of the hostname. This can result in a situation where a server is not 
resolved because the routing is done based on hostname.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10747) CoGroupConnectedComponentsITCase deadlocked on Travis

2018-11-01 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10747:
-

 Summary: CoGroupConnectedComponentsITCase deadlocked on Travis
 Key: FLINK-10747
 URL: https://issues.apache.org/jira/browse/FLINK-10747
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.7.0
Reporter: Till Rohrmann
 Fix For: 1.7.0


The {{CoGroupConnectedComponentsITCase}} deadlocks on Travis: 
https://api.travis-ci.org/v3/job/449230962/log.txt.

It looks as if the iteration gets stuck and waits 
{{SuperstepKickoffLatch.awaitStartOfSuperstepOrTermination}} without ever 
getting a proper notification. This might indicate a serious bug in our 
iteration implementation. This could also be related to FLINK-10741.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

2018-11-01 Thread GitBox
yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r230007754
 
 

 ##
 File path: flink-end-to-end-tests/flink-kafka-test/pom.xml
 ##
 @@ -0,0 +1,91 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.7-SNAPSHOT
+   
+   4.0.0
+
+   flink-kafka-test
+   flink-kafka-test
+
+   
+   
+   org.apache.flink
+   flink-kafka-test-base
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-base_${scala.binary.version}
 
 Review comment:
   ```
   03:59:30.102 [WARNING] 
   Dependency convergence error for org.apache.kafka:kafka-clients:0.10.2.1 
paths to dependency are:
   +-org.apache.flink:flink-kafka-test:1.7-SNAPSHOT
 +-org.apache.flink:flink-connector-kafka_2.11:1.7-SNAPSHOT
   +-org.apache.flink:flink-connector-kafka-base_2.11:1.7-SNAPSHOT
 +-org.apache.kafka:kafka-clients:0.10.2.1
   and
   +-org.apache.flink:flink-kafka-test:1.7-SNAPSHOT
 +-org.apache.flink:flink-connector-kafka_2.11:1.7-SNAPSHOT
   +-org.apache.kafka:kafka-clients:2.0.0
   
   03:59:30.103 [WARNING] Rule 0: 
org.apache.maven.plugins.enforcer.DependencyConvergence failed with message:
   Failed while enforcing releasability. See above detailed error message.
   ```
   
   details : https://api.travis-ci.org/v3/job/449173244/log.txt


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671471#comment-16671471
 ] 

ASF GitHub Bot commented on FLINK-10600:


yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r230007754
 
 

 ##
 File path: flink-end-to-end-tests/flink-kafka-test/pom.xml
 ##
 @@ -0,0 +1,91 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.7-SNAPSHOT
+   
+   4.0.0
+
+   flink-kafka-test
+   flink-kafka-test
+
+   
+   
+   org.apache.flink
+   flink-kafka-test-base
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-base_${scala.binary.version}
 
 Review comment:
   ```
   03:59:30.102 [WARNING] 
   Dependency convergence error for org.apache.kafka:kafka-clients:0.10.2.1 
paths to dependency are:
   +-org.apache.flink:flink-kafka-test:1.7-SNAPSHOT
 +-org.apache.flink:flink-connector-kafka_2.11:1.7-SNAPSHOT
   +-org.apache.flink:flink-connector-kafka-base_2.11:1.7-SNAPSHOT
 +-org.apache.kafka:kafka-clients:0.10.2.1
   and
   +-org.apache.flink:flink-kafka-test:1.7-SNAPSHOT
 +-org.apache.flink:flink-connector-kafka_2.11:1.7-SNAPSHOT
   +-org.apache.kafka:kafka-clients:2.0.0
   
   03:59:30.103 [WARNING] Rule 0: 
org.apache.maven.plugins.enforcer.DependencyConvergence failed with message:
   Failed while enforcing releasability. See above detailed error message.
   ```
   
   details : https://api.travis-ci.org/v3/job/449173244/log.txt


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] aljoscha commented on a change in pull request #6965: [FLINK-10368][e2e] Hardened kerberized yarn e2e test

2018-11-01 Thread GitBox
aljoscha commented on a change in pull request #6965: [FLINK-10368][e2e] 
Hardened kerberized yarn e2e test
URL: https://github.com/apache/flink/pull/6965#discussion_r230008139
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
 ##
 @@ -60,19 +64,41 @@ function cluster_shutdown {
 trap cluster_shutdown INT
 trap cluster_shutdown EXIT
 
-until docker cp $FLINK_TARBALL_DIR/$FLINK_TARBALL master:/home/hadoop-user/; do
-# we're retrying this one because we don't know yet if the container is 
ready
-echo "Uploading Flink tarball to docker master failed, retrying ..."
-sleep 5
+# wait for kerberos to be set up
+start_time=$(date +%s)
+until docker logs master 2>&1 | grep -q "Finished master initialization"; do
+current_time=$(date +%s)
+time_diff=$((current_time - start_time))
+
+if [ $time_diff -ge $MAX_RETRY_SECONDS ]; then
+echo "ERROR: Could not start hadoop cluster. Aborting..."
+exit 0
+else
+echo "Waiting for hadoop cluster to come up. We have been trying for 
$time_diff seconds, retrying ..."
+sleep 10
+fi
 done
 
+# perform health checks
+if ! { [ $(docker inspect -f '{{.State.Running}}' master 2>&1) = 'true' ] &&
+   [ $(docker inspect -f '{{.State.Running}}' slave1 2>&1) = 'true' ] &&
+   [ $(docker inspect -f '{{.State.Running}}' slave2 2>&1) = 'true' ] &&
+   [ $(docker inspect -f '{{.State.Running}}' kdc 2>&1) = 'true' ]; };
+then
+echo "ERROR: Could not start hadoop cluster. At least one of the 
containers failed. Aborting..."
+exit 0
 
 Review comment:
   Isn't exit code `1` the exit code for failure?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aljoscha commented on a change in pull request #6965: [FLINK-10368][e2e] Hardened kerberized yarn e2e test

2018-11-01 Thread GitBox
aljoscha commented on a change in pull request #6965: [FLINK-10368][e2e] 
Hardened kerberized yarn e2e test
URL: https://github.com/apache/flink/pull/6965#discussion_r230008106
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
 ##
 @@ -60,19 +64,41 @@ function cluster_shutdown {
 trap cluster_shutdown INT
 trap cluster_shutdown EXIT
 
-until docker cp $FLINK_TARBALL_DIR/$FLINK_TARBALL master:/home/hadoop-user/; do
-# we're retrying this one because we don't know yet if the container is 
ready
-echo "Uploading Flink tarball to docker master failed, retrying ..."
-sleep 5
+# wait for kerberos to be set up
+start_time=$(date +%s)
+until docker logs master 2>&1 | grep -q "Finished master initialization"; do
+current_time=$(date +%s)
+time_diff=$((current_time - start_time))
+
+if [ $time_diff -ge $MAX_RETRY_SECONDS ]; then
+echo "ERROR: Could not start hadoop cluster. Aborting..."
+exit 0
 
 Review comment:
   Isn't exit code `1` the exit code for failure?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10368) 'Kerberized YARN on Docker test' instable

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671476#comment-16671476
 ] 

ASF GitHub Bot commented on FLINK-10368:


aljoscha commented on a change in pull request #6965: [FLINK-10368][e2e] 
Hardened kerberized yarn e2e test
URL: https://github.com/apache/flink/pull/6965#discussion_r230008139
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
 ##
 @@ -60,19 +64,41 @@ function cluster_shutdown {
 trap cluster_shutdown INT
 trap cluster_shutdown EXIT
 
-until docker cp $FLINK_TARBALL_DIR/$FLINK_TARBALL master:/home/hadoop-user/; do
-# we're retrying this one because we don't know yet if the container is 
ready
-echo "Uploading Flink tarball to docker master failed, retrying ..."
-sleep 5
+# wait for kerberos to be set up
+start_time=$(date +%s)
+until docker logs master 2>&1 | grep -q "Finished master initialization"; do
+current_time=$(date +%s)
+time_diff=$((current_time - start_time))
+
+if [ $time_diff -ge $MAX_RETRY_SECONDS ]; then
+echo "ERROR: Could not start hadoop cluster. Aborting..."
+exit 0
+else
+echo "Waiting for hadoop cluster to come up. We have been trying for 
$time_diff seconds, retrying ..."
+sleep 10
+fi
 done
 
+# perform health checks
+if ! { [ $(docker inspect -f '{{.State.Running}}' master 2>&1) = 'true' ] &&
+   [ $(docker inspect -f '{{.State.Running}}' slave1 2>&1) = 'true' ] &&
+   [ $(docker inspect -f '{{.State.Running}}' slave2 2>&1) = 'true' ] &&
+   [ $(docker inspect -f '{{.State.Running}}' kdc 2>&1) = 'true' ]; };
+then
+echo "ERROR: Could not start hadoop cluster. At least one of the 
containers failed. Aborting..."
+exit 0
 
 Review comment:
   Isn't exit code `1` the exit code for failure?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> 'Kerberized YARN on Docker test' instable
> -
>
> Key: FLINK-10368
> URL: https://issues.apache.org/jira/browse/FLINK-10368
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Dawid Wysakowicz
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> Running Kerberized YARN on Docker test end-to-end test failed on an AWS 
> instance. The problem seems to be that the NameNode went into safe-mode due 
> to limited resources.
> {code}
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/hadoop-user/flink-1.6.1/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/local/hadoop-2.8.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-09-19 09:04:39,201 INFO  org.apache.hadoop.security.UserGroupInformation 
>   - Login successful for user hadoop-user using keytab file 
> /home/hadoop-user/hadoop-user.keytab
> 2018-09-19 09:04:39,453 INFO  org.apache.hadoop.yarn.client.RMProxy   
>   - Connecting to ResourceManager at 
> master.docker-hadoop-cluster-network/172.22.0.3:8032
> 2018-09-19 09:04:39,640 INFO  org.apache.hadoop.yarn.client.AHSProxy  
>   - Connecting to Application History server at 
> master.docker-hadoop-cluster-network/172.22.0.3:10200
> 2018-09-19 09:04:39,656 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-09-19 09:04:39,656 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-09-19 09:04:39,901 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cluster 
> specification: ClusterSpecification{masterMemoryMB=2000, 
> taskManagerMemoryMB=2000, numberTaskManagers=3, slotsPerTaskManager=1}
> 2018-09-19 09:04:40,286 WARN  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The 
> configuration directory ('/home/hadoop-user/flink-1.6.1/conf') contains both 
> LOG4J and Logback configuration files. Please delete

[jira] [Commented] (FLINK-10368) 'Kerberized YARN on Docker test' instable

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671477#comment-16671477
 ] 

ASF GitHub Bot commented on FLINK-10368:


aljoscha commented on a change in pull request #6965: [FLINK-10368][e2e] 
Hardened kerberized yarn e2e test
URL: https://github.com/apache/flink/pull/6965#discussion_r230008106
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
 ##
 @@ -60,19 +64,41 @@ function cluster_shutdown {
 trap cluster_shutdown INT
 trap cluster_shutdown EXIT
 
-until docker cp $FLINK_TARBALL_DIR/$FLINK_TARBALL master:/home/hadoop-user/; do
-# we're retrying this one because we don't know yet if the container is 
ready
-echo "Uploading Flink tarball to docker master failed, retrying ..."
-sleep 5
+# wait for kerberos to be set up
+start_time=$(date +%s)
+until docker logs master 2>&1 | grep -q "Finished master initialization"; do
+current_time=$(date +%s)
+time_diff=$((current_time - start_time))
+
+if [ $time_diff -ge $MAX_RETRY_SECONDS ]; then
+echo "ERROR: Could not start hadoop cluster. Aborting..."
+exit 0
 
 Review comment:
   Isn't exit code `1` the exit code for failure?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> 'Kerberized YARN on Docker test' instable
> -
>
> Key: FLINK-10368
> URL: https://issues.apache.org/jira/browse/FLINK-10368
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Dawid Wysakowicz
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> Running Kerberized YARN on Docker test end-to-end test failed on an AWS 
> instance. The problem seems to be that the NameNode went into safe-mode due 
> to limited resources.
> {code}
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/hadoop-user/flink-1.6.1/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/local/hadoop-2.8.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-09-19 09:04:39,201 INFO  org.apache.hadoop.security.UserGroupInformation 
>   - Login successful for user hadoop-user using keytab file 
> /home/hadoop-user/hadoop-user.keytab
> 2018-09-19 09:04:39,453 INFO  org.apache.hadoop.yarn.client.RMProxy   
>   - Connecting to ResourceManager at 
> master.docker-hadoop-cluster-network/172.22.0.3:8032
> 2018-09-19 09:04:39,640 INFO  org.apache.hadoop.yarn.client.AHSProxy  
>   - Connecting to Application History server at 
> master.docker-hadoop-cluster-network/172.22.0.3:10200
> 2018-09-19 09:04:39,656 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-09-19 09:04:39,656 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-09-19 09:04:39,901 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cluster 
> specification: ClusterSpecification{masterMemoryMB=2000, 
> taskManagerMemoryMB=2000, numberTaskManagers=3, slotsPerTaskManager=1}
> 2018-09-19 09:04:40,286 WARN  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The 
> configuration directory ('/home/hadoop-user/flink-1.6.1/conf') contains both 
> LOG4J and Logback configuration files. Please delete or rename one of them.
> 
>  The program finished with the following exception:
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
> deploy Yarn session cluster
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420)
> at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:259)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameter

[jira] [Updated] (FLINK-10747) CoGroupConnectedComponentsITCase deadlocked on Travis

2018-11-01 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-10747:
--
Priority: Blocker  (was: Critical)

> CoGroupConnectedComponentsITCase deadlocked on Travis
> -
>
> Key: FLINK-10747
> URL: https://issues.apache.org/jira/browse/FLINK-10747
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.7.0
>
>
> The {{CoGroupConnectedComponentsITCase}} deadlocks on Travis: 
> https://api.travis-ci.org/v3/job/449230962/log.txt.
> It looks as if the iteration gets stuck and waits 
> {{SuperstepKickoffLatch.awaitStartOfSuperstepOrTermination}} without ever 
> getting a proper notification. This might indicate a serious bug in our 
> iteration implementation. This could also be related to FLINK-10741.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10704) Fix sql client end to end test failure

2018-11-01 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671486#comment-16671486
 ] 

vinoyang commented on FLINK-10704:
--

[~twalthr] You are right, I have fixed the exception. And about the previous 
problem of hash value, the actual content is : 

 
{code:java}
{
  "hits" : {
"total" : 3,
"max_score" : 1.0,
"hits" : [
  {
"_index" : "my_users",
"_type" : "user",
"_id" : "1_Bob  ",
"_score" : 1.0,
"_source" : {
  "user_id" : 1,
  "user_name" : "Bob  ",
  "user_count" : 1
}
  },
  {
"_index" : "my_users",
"_type" : "user",
"_id" : "22_Alice",
"_score" : 1.0,
"_source" : {
  "user_id" : 22,
  "user_name" : "Alice",
  "user_count" : 1
}
  },
  {
"_index" : "my_users",
"_type" : "user",
"_id" : "42_Greg ",
"_score" : 1.0,
"_source" : {
  "user_id" : 42,
  "user_name" : "Greg ",
  "user_count" : 3
}
  }
]
  }
}

{code}
But I do not know what the "*982cb32908def9801e781381c1b8a8db*" mean.  So I can 
not compare them. Can you tell me the expected value?

 

> Fix sql client end to end test failure
> --
>
> Key: FLINK-10704
> URL: https://issues.apache.org/jira/browse/FLINK-10704
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The log file contains the following sentence:
> {code:java}
> 2018-10-29 03:27:39,209 WARN 
> org.apache.flink.kafka010.shaded.org.apache.kafka.common.utils.AppInfoParser 
> - Error while loading kafka-version.properties :null
> {code}
> The reason for this log is that we explicitly exclude the version description 
> file of the kafka client when packaging the connector:
> {code:java}
> 
>
>   *:*
>   
>  kafka/kafka-version.properties
>   
>
> {code}
> When the shell scan the "error" keyword with grep, it will hit, so the test 
> will fail.
> {code:java}
> function check_logs_for_errors {
>   error_count=$(grep -rv "GroupCoordinatorNotAvailableException" 
> $FLINK_DIR/log \
>   | grep -v "RetriableCommitFailedException" \
>   | grep -v "NoAvailableBrokersException" \
>   | grep -v "Async Kafka commit failed" \
>   | grep -v "DisconnectException" \
>   | grep -v "AskTimeoutException" \
>   | grep -v "WARN  akka.remote.transport.netty.NettyTransport" \
>   | grep -v  "WARN  
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \
>   | grep -v "jvm-exit-on-fatal-error" \
>   | grep -v '^INFO:.*AWSErrorCode=\[400 Bad 
> Request\].*ServiceEndpoint=\[https://.*\.s3\.amazonaws\.com\].*RequestType=\[HeadBucketRequest\]'
>  \
>   | grep -v "RejectedExecutionException" \
>   | grep -v "An exception was thrown by an exception handler" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/yarn/exceptions/YarnException" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/conf/Configuration" \
>   | grep -v 
> "org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector
>   - Error when creating PropertyDescriptor for public final void 
> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)!
>  Ignoring this property." \
>   | grep -ic "error")//here
>   if [[ ${error_count} -gt 0 ]]; then
> echo "Found error in log files:"
> cat $FLINK_DIR/log/*
> EXIT_CODE=1
>   fi
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski opened a new pull request #6981: [FLINK-10638][table] Invalid table scan resolution for temporal join queries

2018-11-01 Thread GitBox
pnowojski opened a new pull request #6981: [FLINK-10638][table] Invalid table 
scan resolution for temporal join queries
URL: https://github.com/apache/flink/pull/6981
 
 
   Previously there was a strict fixed order of applying 
LogicalCorrelateToTemporalTableJoinRule
   and TableScanRule rules. This was causing problems, since either of them 
could create a new
   RelNodes that have to be subject of the other rule (imagine deeply nested 
TemporalTableFunction
   that references registered tables/views and other TemporalTableFunctions).
   
   Solution to this problem is to run both of those rules in one 
group/collection in HepPlaner.
   Instead of applying one rule to whole tree then the other rule, both rules 
are applied to
   a parent node, before going down/deeper.
   
   ## Verifying this change
   
   This change modify existing tests to provide test coverage against this bug.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10638) Invalid table scan resolution for temporal join queries

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671493#comment-16671493
 ] 

ASF GitHub Bot commented on FLINK-10638:


pnowojski opened a new pull request #6981: [FLINK-10638][table] Invalid table 
scan resolution for temporal join queries
URL: https://github.com/apache/flink/pull/6981
 
 
   Previously there was a strict fixed order of applying 
LogicalCorrelateToTemporalTableJoinRule
   and TableScanRule rules. This was causing problems, since either of them 
could create a new
   RelNodes that have to be subject of the other rule (imagine deeply nested 
TemporalTableFunction
   that references registered tables/views and other TemporalTableFunctions).
   
   Solution to this problem is to run both of those rules in one 
group/collection in HepPlaner.
   Instead of applying one rule to whole tree then the other rule, both rules 
are applied to
   a parent node, before going down/deeper.
   
   ## Verifying this change
   
   This change modify existing tests to provide test coverage against this bug.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Invalid table scan resolution for temporal join queries
> ---
>
> Key: FLINK-10638
> URL: https://issues.apache.org/jira/browse/FLINK-10638
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Registered tables that contain a temporal join are not properly resolved when 
> performing a table scan.
> A planning error occurs when registering a table with a temporal join and 
> reading from it again.
> {code}
> LogicalProject(amount=[*($0, $4)])
>   LogicalFilter(condition=[=($3, $1)])
> LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
> requiredColumns=[{2}])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
>   
> LogicalTableFunctionScan(invocation=[Rates(CAST($cor0.rowtime):TIMESTAMP(3) 
> NOT NULL)], rowType=[RecordType(VARCHAR(65536) currency, BIGINT rate, TIME 
> ATTRIBUTE(ROWTIME) rowtime)], elementType=[class [Ljava.lang.Object;])
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10704) Fix sql client end to end test failure

2018-11-01 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671499#comment-16671499
 ] 

Timo Walther commented on FLINK-10704:
--

So you cannot run the e2e test successfully before your changes? This sounds 
like a bug in the e2e test to me then.

> Fix sql client end to end test failure
> --
>
> Key: FLINK-10704
> URL: https://issues.apache.org/jira/browse/FLINK-10704
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The log file contains the following sentence:
> {code:java}
> 2018-10-29 03:27:39,209 WARN 
> org.apache.flink.kafka010.shaded.org.apache.kafka.common.utils.AppInfoParser 
> - Error while loading kafka-version.properties :null
> {code}
> The reason for this log is that we explicitly exclude the version description 
> file of the kafka client when packaging the connector:
> {code:java}
> 
>
>   *:*
>   
>  kafka/kafka-version.properties
>   
>
> {code}
> When the shell scan the "error" keyword with grep, it will hit, so the test 
> will fail.
> {code:java}
> function check_logs_for_errors {
>   error_count=$(grep -rv "GroupCoordinatorNotAvailableException" 
> $FLINK_DIR/log \
>   | grep -v "RetriableCommitFailedException" \
>   | grep -v "NoAvailableBrokersException" \
>   | grep -v "Async Kafka commit failed" \
>   | grep -v "DisconnectException" \
>   | grep -v "AskTimeoutException" \
>   | grep -v "WARN  akka.remote.transport.netty.NettyTransport" \
>   | grep -v  "WARN  
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \
>   | grep -v "jvm-exit-on-fatal-error" \
>   | grep -v '^INFO:.*AWSErrorCode=\[400 Bad 
> Request\].*ServiceEndpoint=\[https://.*\.s3\.amazonaws\.com\].*RequestType=\[HeadBucketRequest\]'
>  \
>   | grep -v "RejectedExecutionException" \
>   | grep -v "An exception was thrown by an exception handler" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/yarn/exceptions/YarnException" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/conf/Configuration" \
>   | grep -v 
> "org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector
>   - Error when creating PropertyDescriptor for public final void 
> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)!
>  Ignoring this property." \
>   | grep -ic "error")//here
>   if [[ ${error_count} -gt 0 ]]; then
> echo "Found error in log files:"
> cat $FLINK_DIR/log/*
> EXIT_CODE=1
>   fi
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10749) GraphOperationsITCase deadlocks on travis

2018-11-01 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10749:


 Summary: GraphOperationsITCase deadlocks on travis
 Key: FLINK-10749
 URL: https://issues.apache.org/jira/browse/FLINK-10749
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Affects Versions: 1.7.0
Reporter: Chesnay Schepler


https://travis-ci.org/apache/flink/jobs/449173144



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10704) Fix sql client end to end test failure

2018-11-01 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671502#comment-16671502
 ] 

vinoyang commented on FLINK-10704:
--

[~twalthr] Now it reports this error : 

 
{code:java}
FAIL SQL Client Elasticsearch Upsert: Output hash mismatch.  Got 
6187222e109ee9222e6b2f117742070c, expected 982cb32908def9801e781381c1b8a8db.
{code}

> Fix sql client end to end test failure
> --
>
> Key: FLINK-10704
> URL: https://issues.apache.org/jira/browse/FLINK-10704
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The log file contains the following sentence:
> {code:java}
> 2018-10-29 03:27:39,209 WARN 
> org.apache.flink.kafka010.shaded.org.apache.kafka.common.utils.AppInfoParser 
> - Error while loading kafka-version.properties :null
> {code}
> The reason for this log is that we explicitly exclude the version description 
> file of the kafka client when packaging the connector:
> {code:java}
> 
>
>   *:*
>   
>  kafka/kafka-version.properties
>   
>
> {code}
> When the shell scan the "error" keyword with grep, it will hit, so the test 
> will fail.
> {code:java}
> function check_logs_for_errors {
>   error_count=$(grep -rv "GroupCoordinatorNotAvailableException" 
> $FLINK_DIR/log \
>   | grep -v "RetriableCommitFailedException" \
>   | grep -v "NoAvailableBrokersException" \
>   | grep -v "Async Kafka commit failed" \
>   | grep -v "DisconnectException" \
>   | grep -v "AskTimeoutException" \
>   | grep -v "WARN  akka.remote.transport.netty.NettyTransport" \
>   | grep -v  "WARN  
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \
>   | grep -v "jvm-exit-on-fatal-error" \
>   | grep -v '^INFO:.*AWSErrorCode=\[400 Bad 
> Request\].*ServiceEndpoint=\[https://.*\.s3\.amazonaws\.com\].*RequestType=\[HeadBucketRequest\]'
>  \
>   | grep -v "RejectedExecutionException" \
>   | grep -v "An exception was thrown by an exception handler" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/yarn/exceptions/YarnException" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/conf/Configuration" \
>   | grep -v 
> "org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector
>   - Error when creating PropertyDescriptor for public final void 
> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)!
>  Ignoring this property." \
>   | grep -ic "error")//here
>   if [[ ${error_count} -gt 0 ]]; then
> echo "Found error in log files:"
> cat $FLINK_DIR/log/*
> EXIT_CODE=1
>   fi
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10704) Fix sql client end to end test failure

2018-11-01 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671502#comment-16671502
 ] 

vinoyang edited comment on FLINK-10704 at 11/1/18 11:55 AM:


[~twalthr] Now it reports this error : 

 
{code:java}
FAIL SQL Client Elasticsearch Upsert: Output hash mismatch.  Got 
6187222e109ee9222e6b2f117742070c, expected 
982cb32908def9801e781381c1b8a8db.{code}

The end-to-end tests will fail.


was (Author: yanghua):
[~twalthr] Now it reports this error : 

 
{code:java}
FAIL SQL Client Elasticsearch Upsert: Output hash mismatch.  Got 
6187222e109ee9222e6b2f117742070c, expected 982cb32908def9801e781381c1b8a8db.
{code}

> Fix sql client end to end test failure
> --
>
> Key: FLINK-10704
> URL: https://issues.apache.org/jira/browse/FLINK-10704
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The log file contains the following sentence:
> {code:java}
> 2018-10-29 03:27:39,209 WARN 
> org.apache.flink.kafka010.shaded.org.apache.kafka.common.utils.AppInfoParser 
> - Error while loading kafka-version.properties :null
> {code}
> The reason for this log is that we explicitly exclude the version description 
> file of the kafka client when packaging the connector:
> {code:java}
> 
>
>   *:*
>   
>  kafka/kafka-version.properties
>   
>
> {code}
> When the shell scan the "error" keyword with grep, it will hit, so the test 
> will fail.
> {code:java}
> function check_logs_for_errors {
>   error_count=$(grep -rv "GroupCoordinatorNotAvailableException" 
> $FLINK_DIR/log \
>   | grep -v "RetriableCommitFailedException" \
>   | grep -v "NoAvailableBrokersException" \
>   | grep -v "Async Kafka commit failed" \
>   | grep -v "DisconnectException" \
>   | grep -v "AskTimeoutException" \
>   | grep -v "WARN  akka.remote.transport.netty.NettyTransport" \
>   | grep -v  "WARN  
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \
>   | grep -v "jvm-exit-on-fatal-error" \
>   | grep -v '^INFO:.*AWSErrorCode=\[400 Bad 
> Request\].*ServiceEndpoint=\[https://.*\.s3\.amazonaws\.com\].*RequestType=\[HeadBucketRequest\]'
>  \
>   | grep -v "RejectedExecutionException" \
>   | grep -v "An exception was thrown by an exception handler" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/yarn/exceptions/YarnException" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/conf/Configuration" \
>   | grep -v 
> "org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector
>   - Error when creating PropertyDescriptor for public final void 
> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)!
>  Ignoring this property." \
>   | grep -ic "error")//here
>   if [[ ${error_count} -gt 0 ]]; then
> echo "Found error in log files:"
> cat $FLINK_DIR/log/*
> EXIT_CODE=1
>   fi
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10749) GraphOperationsITCase deadlocks on travis

2018-11-01 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10749.

Resolution: Duplicate

> GraphOperationsITCase deadlocks on travis
> -
>
> Key: FLINK-10749
> URL: https://issues.apache.org/jira/browse/FLINK-10749
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> https://travis-ci.org/apache/flink/jobs/449173144



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10741) GraphOperationsITCase.testFilterOnVerticesSugar deadlocks on Travis

2018-11-01 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671504#comment-16671504
 ] 

Chesnay Schepler commented on FLINK-10741:
--

Another instance for the {{testFilterOnVertices}} test: 
https://travis-ci.org/apache/flink/jobs/449173144

> GraphOperationsITCase.testFilterOnVerticesSugar deadlocks on Travis
> ---
>
> Key: FLINK-10741
> URL: https://issues.apache.org/jira/browse/FLINK-10741
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.7.0
>
>
> The {{GraphOperationsITCase.testFilterOnVerticesSugar}} test deadlocks on 
> Travis:
> https://api.travis-ci.org/v3/job/448905716/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   >