[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513429#comment-16513429
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r195644101
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

At the _very least_ we should already be using TM IDs.


> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...

2018-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r195644101
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

At the _very least_ we should already be using TM IDs.


---


[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513423#comment-16513423
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r195643808
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

I just don't see a reason to rush this. There's a known issue we have to 
fix and the PR is not at risk of becoming outdated in the mean-time. If the 
argument is that "other people might start using it already" then we may just 
end up unnecessarily breaking their setup before the release.


> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...

2018-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r195643808
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

I just don't see a reason to rush this. There's a known issue we have to 
fix and the PR is not at risk of becoming outdated in the mean-time. If the 
argument is that "other people might start using it already" then we may just 
end up unnecessarily breaking their setup before the release.


---


[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-14 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513390#comment-16513390
 ] 

Shimin Yang commented on FLINK-9567:


I modified the onContainerCompleted method and running for test.

> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Priority: Critical
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers

2018-06-14 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9455:
-

Assignee: Till Rohrmann  (was: Sihua Zhou)

> Make SlotManager aware of multi slot TaskManagers
> -
>
> Key: FLINK-9455
> URL: https://issues.apache.org/jira/browse/FLINK-9455
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> The {{SlotManager}} responsible for managing all available slots of a Flink 
> cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot 
> request. The started {{TaskManager}} can be started with multiple slots 
> configured but currently, the {{SlotManager}} thinks that it will be started 
> with a single slot. As a consequence, it might issue multiple requests to 
> start new TaskManagers even though a single one would be sufficient to 
> fulfill all pending slot requests.
> In order to avoid requesting unnecessary resources which are freed after the 
> idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a 
> {{TaskManager}} is started with. That way the SlotManager only needs to 
> request a new {{TaskManager}} if all of the previously started slots 
> (potentially not yet registered and, thus, future slots) are being assigned 
> to slot requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers

2018-06-14 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513336#comment-16513336
 ] 

Sihua Zhou commented on FLINK-9455:
---

Hi [~till.rohrmann] the more I thought about this , the more I found it's 
tricky, since you maybe the best one that familiar with this related part. I 
don't want mess this up, I would leave this issue to you. 

> Make SlotManager aware of multi slot TaskManagers
> -
>
> Key: FLINK-9455
> URL: https://issues.apache.org/jira/browse/FLINK-9455
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> The {{SlotManager}} responsible for managing all available slots of a Flink 
> cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot 
> request. The started {{TaskManager}} can be started with multiple slots 
> configured but currently, the {{SlotManager}} thinks that it will be started 
> with a single slot. As a consequence, it might issue multiple requests to 
> start new TaskManagers even though a single one would be sufficient to 
> fulfill all pending slot requests.
> In order to avoid requesting unnecessary resources which are freed after the 
> idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a 
> {{TaskManager}} is started with. That way the SlotManager only needs to 
> request a new {{TaskManager}} if all of the previously started slots 
> (potentially not yet registered and, thus, future slots) are being assigned 
> to slot requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9168) Pulsar Sink Connector

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513320#comment-16513320
 ] 

ASF GitHub Bot commented on FLINK-9168:
---

Github user XiaoZYang commented on the issue:

https://github.com/apache/flink/pull/5845
  
@zentol got it! So what you mean is that @sijie and I will keep updating 
commits until this work is done and you guys will merge this PR to a new 
branch. Later PR will be against the new branch. Did I misunderstand you ?


> Pulsar Sink Connector
> -
>
> Key: FLINK-9168
> URL: https://issues.apache.org/jira/browse/FLINK-9168
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Zongyang Xiao
>Priority: Minor
> Fix For: 1.6.0
>
>
> Flink does not provide a sink connector for Pulsar.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector

2018-06-14 Thread XiaoZYang
Github user XiaoZYang commented on the issue:

https://github.com/apache/flink/pull/5845
  
@zentol got it! So what you mean is that @sijie and I will keep updating 
commits until this work is done and you guys will merge this PR to a new 
branch. Later PR will be against the new branch. Did I misunderstand you ?


---


[jira] [Commented] (FLINK-9563) Migrate integration tests for CEP

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513319#comment-16513319
 ] 

ASF GitHub Bot commented on FLINK-9563:
---

Github user deepaks4077 commented on the issue:

https://github.com/apache/flink/pull/6170
  
@zentol , please take a look when you get the chance.

I think testSimpleAfterMatchSkip() is failing 
[here](https://api.travis-ci.org/v3/job/392551505/log.txt) because the program 
arrived at the assert statement before the result stream was fully added to the 
sink. This test was passing in the travis ci build of my fork. Maybe we can add 
a countdownlatch to control for the size of the array list VALUES? A rich sink 
function would then countdown upon closing. What would you suggest?


> Migrate integration tests for CEP
> -
>
> Key: FLINK-9563
> URL: https://issues.apache.org/jira/browse/FLINK-9563
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Deepak Sharma
>Assignee: Deepak Sharma
>Priority: Minor
>
> Covers all integration tests under
> apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...

2018-06-14 Thread deepaks4077
Github user deepaks4077 commented on the issue:

https://github.com/apache/flink/pull/6170
  
@zentol , please take a look when you get the chance.

I think testSimpleAfterMatchSkip() is failing 
[here](https://api.travis-ci.org/v3/job/392551505/log.txt) because the program 
arrived at the assert statement before the result stream was fully added to the 
sink. This test was passing in the travis ci build of my fork. Maybe we can add 
a countdownlatch to control for the size of the array list VALUES? A rich sink 
function would then countdown upon closing. What would you suggest?


---


[jira] [Updated] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-14 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-9567:
---
Description: 
After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
release task manager containers in some specific case. In the worst case, I had 
a job configured to 5 task managers, but possess more than 100 containers in 
the end. Although the task didn't failed, but it affect other jobs in the Yarn 
Cluster.

In the first log I posted, the container with id 24 is the reason why Yarn did 
not release resources. As the container was killed before restart, but it has 
not received the callback of *onContainerComplete* in *YarnResourceManager* 
which should be called by *AMRMAsyncClient* of Yarn. After restart, as we can 
see in line 347 of FlinkYarnProblem log, 

2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has failed, 
address is now gated for [50] ms. Reason: [Disassociated]

Flink lost the connection of container 24 which is on bd-r1hdp69 machine. When 
it try to call *closeTaskManagerConnection* in *onContainerComplete*, it did 
not has the connection to TaskManager on container 24, so it just ignore the 
close of TaskManger.

2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
open TaskExecutor connection container_1528707394163_29461_02_24. Ignoring 
close TaskExecutor connection.

 However, bafore calling *closeTaskManagerConnection,* it already called 
*requestYarnContainer* which lead to *numPendingContainerRequests variable in* 
*YarnResourceManager* increased by 1.

As the excessive container return is determined by the 
*numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
return this container although it is not required. Meanwhile, the restart logic 
has already allocated enough containers for Task Managers, Flink will possess 
the extra container for a long time for nothing. 

In the full log, the job ended with 7 containers while only 3 are running 
TaskManagers.

ps: Another strange thing I found is that when sometimes request for a yarn 
container, it will return much more than requested. Is it a normal scenario for 
AMRMAsyncClient?

  was:
After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
release task manager containers in some specific case. In the worst case, I had 
a job configured to 5 task managers, but possess more than 100 containers in 
the end. Although the task didn't failed, but it affect other jobs in the Yarn 
Cluster.

In the first log I posted, the container with id 24 is the reason why Yarn did 
not release resources. As the container was killed before restart, but it has 
not received the callback of *onContainerComplete* in *YarnResourceManager* 
which should be called by *AMRMAsyncClient* of Yarn. After restart, as we can 
see in line 347 of FlinkYarnProblem log, 

2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has failed, 
address is now gated for [50] ms. Reason: [Disassociated]

Flink lost the connection of container 24 which is on bd-r1hdp69 machine. When 
it try to call *closeTaskManagerConnection* in *onContainerComplete*, it did 
not has the connection to TaskManager on container 24, so it just ignore the 
close of TaskManger.

2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
open TaskExecutor connection container_1528707394163_29461_02_24. Ignoring 
close TaskExecutor connection.

 However, bafore calling *closeTaskManagerConnection,* it already called 
*requestYarnContainer* which lead to *numPendingContainerRequests variable in* 
*YarnResourceManager* increased by 1.

As the excessive container return is determined by the 
*numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
return this container although it is not required. Meanwhile, the restart logic 
has already allocated enough containers for Task Managers, Flink will possess 
the extra container for a long time for nothing. 

ps: Another strange thing I found is that when sometimes request for a yarn 
container, it will return much more than requested. Is it a normal scenario for 
AMRMAsyncClient?


> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Priority: Critical
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager co

[jira] [Updated] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-14 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-9567:
---
Description: 
After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
release task manager containers in some specific case. In the worst case, I had 
a job configured to 5 task managers, but possess more than 100 containers in 
the end. Although the task didn't failed, but it affect other jobs in the Yarn 
Cluster.

In the first log I posted, the container with id 24 is the reason why Yarn did 
not release resources. As the container was killed before restart, but it has 
not received the callback of *onContainerComplete* in *YarnResourceManager* 
which should be called by *AMRMAsyncClient* of Yarn. After restart, as we can 
see in line 347 of FlinkYarnProblem log, 

2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has failed, 
address is now gated for [50] ms. Reason: [Disassociated]

Flink lost the connection of container 24 which is on bd-r1hdp69 machine. When 
it try to call *closeTaskManagerConnection* in *onContainerComplete*, it did 
not has the connection to TaskManager on container 24, so it just ignore the 
close of TaskManger.

2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
open TaskExecutor connection container_1528707394163_29461_02_24. Ignoring 
close TaskExecutor connection.

 However, bafore calling *closeTaskManagerConnection,* it already called 
*requestYarnContainer* which lead to *numPendingContainerRequests variable in* 
*YarnResourceManager* increased by 1.

As the excessive container return is determined by the 
*numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
return this container although it is not required. Meanwhile, the restart logic 
has already allocated enough containers for Task Managers, Flink will possess 
the extra container for a long time for nothing. 

ps: Another strange thing I found is that when sometimes request for a yarn 
container, it will return much more than requested. Is it a normal scenario for 
AMRMAsyncClient?

  was:
After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
release task manager containers in some specific case. In the worst case, I had 
a job configured to 5 task managers, but possess more than 100 containers in 
the end. Although the task didn't failed, but it affect other jobs in the Yarn 
Cluster.

In the first log I posted, the container with id 24 is the reason why Yarn did 
not release resources. As the container was killed before restart, but it has 
not received the callback of *onContainerComplete* in *YarnResourceManager* 
which should be called by *AMRMAsyncClient* of Yarn. After restart, as we can 
see in line 347 of FlinkYarnProblem log, 

2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has failed, 
address is now gated for [50] ms. Reason: [Disassociated]

Flink lost the connection of container 24 which is on bd-r1hdp69 machine. When 
it try to call *closeTaskManagerConnection* in *onContainerComplete*, it did 
not has the connection to TaskManager on container 24, so it just ignore the 
close of TaskManger.

2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
open TaskExecutor connection container_1528707394163_29461_02_24. Ignoring 
close TaskExecutor connection.

 However, bafore calling *closeTaskManagerConnection,* it already called 
*requestYarnContainer* which lead to *numPendingContainerRequests variable in* 
*YarnResourceManager* increased by 1.**

As the excessive container return is determined by the 
*numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
return this container although it is not required*.* Meanwhile, the restart 
logic has already allocated enough containers for Task Managers, Flink will 
possess the extra container for a long time for nothing. 

ps: Another strange thing I found is that when sometimes request for a yarn 
container, it will return much more than requested. Is it a normal scenario for 
AMRMAsyncClient?


> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Priority: Critical
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task

[jira] [Updated] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-14 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-9567:
---
Description: 
After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
release task manager containers in some specific case. In the worst case, I had 
a job configured to 5 task managers, but possess more than 100 containers in 
the end. Although the task didn't failed, but it affect other jobs in the Yarn 
Cluster.

In the first log I posted, the container with id 24 is the reason why Yarn did 
not release resources. As the container was killed before restart, but it has 
not received the callback of *onContainerComplete* in *YarnResourceManager* 
which should be called by *AMRMAsyncClient* of Yarn. After restart, as we can 
see in line 347 of FlinkYarnProblem log, 

2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has failed, 
address is now gated for [50] ms. Reason: [Disassociated]

Flink lost the connection of container 24 which is on bd-r1hdp69 machine. When 
it try to call *closeTaskManagerConnection* in *onContainerComplete*, it did 
not has the connection to TaskManager on container 24, so it just ignore the 
close of TaskManger.

2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
open TaskExecutor connection container_1528707394163_29461_02_24. Ignoring 
close TaskExecutor connection.

 However, bafore calling *closeTaskManagerConnection,* it already called 
*requestYarnContainer* which lead to *numPendingContainerRequests variable in* 
*YarnResourceManager* increased by 1.**

As the excessive container return is determined by the 
*numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
return this container although it is not required*.* Meanwhile, the restart 
logic has already allocated enough containers for Task Managers, Flink will 
possess the extra container for a long time for nothing. 

ps: Another strange thing I found is that when sometimes request for a yarn 
container, it will return much more than requested. Is it a normal scenario for 
AMRMAsyncClient?

  was:
After restart the Job Manager in Yarn Cluster mode, Flink does not release task 
manager containers in some specific case.

In the first log I posted, the container with id 24 is the reason why Yarn did 
not release resources. Although the Task Manager in the container with id 24 
was released before restart. 

But in line 347, 

2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has failed, 
address is now gated for [50] ms. Reason: [Disassociated] 

this problem caused flink to request for one more container more than need. As 
the excessive container return id determined by the 
*numPendingContainerRequests* variable in *YarnResourceManager*, I think it's 
the *onContainersCompleted* in *YarnResourceManager* called the method 
*requestYarnContainer* which leads to the increase of 
*numPendingContainerRequests.* However, the restart logic has already allocated 
enough containers for Task Managers, Flink will possess the extra container for 
a long time for nothing. In the worst case, I had a job configured to 5 task 
managers, but possess more than 100 containers in the end.

ps: Another strange thing I found is that when sometimes request for a yarn 
container, it will return much more than requested. Is it a normal scenario for 
AMRMAsyncClient?


> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Priority: Major
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] m

[jira] [Updated] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-14 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-9567:
---
Priority: Critical  (was: Major)

> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Priority: Critical
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.**
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required*.* Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-14 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-9567:
---
Attachment: fulllog.txt

> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Priority: Major
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, Flink does not release 
> task manager containers in some specific case.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. Although the Task Manager in the container with id 
> 24 was released before restart. 
> But in line 347, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated] 
> this problem caused flink to request for one more container more than need. 
> As the excessive container return id determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, I think it's 
> the *onContainersCompleted* in *YarnResourceManager* called the method 
> *requestYarnContainer* which leads to the increase of 
> *numPendingContainerRequests.* However, the restart logic has already 
> allocated enough containers for Task Managers, Flink will possess the extra 
> container for a long time for nothing. In the worst case, I had a job 
> configured to 5 task managers, but possess more than 100 containers in the 
> end.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-14 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-9567:
---
Attachment: FlinkYarnProblem

> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Priority: Major
> Attachments: FlinkYarnProblem
>
>
> After restart the Job Manager in Yarn Cluster mode, Flink does not release 
> task manager containers in some specific case.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. Although the Task Manager in the container with id 
> 24 was released before restart. 
> But in line 347, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated] 
> this problem caused flink to request for one more container more than need. 
> As the excessive container return id determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, I think it's 
> the *onContainersCompleted* in *YarnResourceManager* called the method 
> *requestYarnContainer* which leads to the increase of 
> *numPendingContainerRequests.* However, the restart logic has already 
> allocated enough containers for Task Managers, Flink will possess the extra 
> container for a long time for nothing. In the worst case, I had a job 
> configured to 5 task managers, but possess more than 100 containers in the 
> end.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-14 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-9567:
---
Attachment: (was: jobmanager.log)

> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Priority: Major
> Attachments: FlinkYarnProblem
>
>
> After restart the Job Manager in Yarn Cluster mode, Flink does not release 
> task manager containers in some specific case.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. Although the Task Manager in the container with id 
> 24 was released before restart. 
> But in line 347, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated] 
> this problem caused flink to request for one more container more than need. 
> As the excessive container return id determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, I think it's 
> the *onContainersCompleted* in *YarnResourceManager* called the method 
> *requestYarnContainer* which leads to the increase of 
> *numPendingContainerRequests.* However, the restart logic has already 
> allocated enough containers for Task Managers, Flink will possess the extra 
> container for a long time for nothing. In the worst case, I had a job 
> configured to 5 task managers, but possess more than 100 containers in the 
> end.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-14 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-9567:
---
Description: 
After restart the Job Manager in Yarn Cluster mode, Flink does not release task 
manager containers in some specific case.

In the first log I posted, the container with id 24 is the reason why Yarn did 
not release resources. Although the Task Manager in the container with id 24 
was released before restart. 

But in line 347, 

2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has failed, 
address is now gated for [50] ms. Reason: [Disassociated] 

this problem caused flink to request for one more container more than need. As 
the excessive container return id determined by the 
*numPendingContainerRequests* variable in *YarnResourceManager*, I think it's 
the *onContainersCompleted* in *YarnResourceManager* called the method 
*requestYarnContainer* which leads to the increase of 
*numPendingContainerRequests.* However, the restart logic has already allocated 
enough containers for Task Managers, Flink will possess the extra container for 
a long time for nothing. In the worst case, I had a job configured to 5 task 
managers, but possess more than 100 containers in the end.

ps: Another strange thing I found is that when sometimes request for a yarn 
container, it will return much more than requested. Is it a normal scenario for 
AMRMAsyncClient?

  was:
After restart the Job Manager in Yarn Cluster mode, Flink does not release task 
manager containers in some specific case. According to my observation, the 
reason is the instance variable *numPendingContainerRequests* in 
*YarnResourceManager* class does not decrease since it has not received the 
containers. And after restart of job manager, it will make increase the 
*numPendingContainerRequests* by the number of task executors. 

Since the callback function *onContainersAllocated* will return the excessive 
container immediately only if the *numPendingContainerRequests* <= 0, so the 
number of container grows bigger and bigger while only a few are acting as task 
manager.

I think it is important to clear the *numPendingContainerRequests* variable 
after restart the Job Manager, but not very clear at how to do that. There's no 
other way to decrease the *numPendingContainerRequests* except the 
*onContainersAllocated*. Is it fine to add a method to operate on the 
*numPendingContainerRequests* variable? And meanwhile, there's no handle of 
YarnResourceManager in the *ExecutionGraph* restart logic.

ps: Another strange thing I found is that when sometimes request for a yarn 
container, it will return much more than requested. Is it a normal scenario for 
AMRMAsyncClient?


> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Priority: Major
> Attachments: jobmanager.log
>
>
> After restart the Job Manager in Yarn Cluster mode, Flink does not release 
> task manager containers in some specific case.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. Although the Task Manager in the container with id 
> 24 was released before restart. 
> But in line 347, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated] 
> this problem caused flink to request for one more container more than need. 
> As the excessive container return id determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, I think it's 
> the *onContainersCompleted* in *YarnResourceManager* called the method 
> *requestYarnContainer* which leads to the increase of 
> *numPendingContainerRequests.* However, the restart logic has already 
> allocated enough containers for Task Managers, Flink will possess the extra 
> container for a long time for nothing. In the worst case, I had a job 
> configured to 5 task managers, but possess more than 100 containers in the 
> end.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9573) Check for leadership with leader session id

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513210#comment-16513210
 ] 

ASF GitHub Bot commented on FLINK-9573:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6154#discussion_r195613671
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
 ---
@@ -356,8 +357,8 @@ public void confirmLeaderSessionID(UUID 
leaderSessionID) {
}
 
@Override
-   public boolean hasLeadership() {
-   return isLeader;
+   public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+   return isLeader && 
leaderSessionId.equals(currentLeaderSessionId);
}
--- End diff --

Thanks @tillrohrmann Makes sense better to me now. Yea, I did not see there 
has a volatile in there before. Thanks again.  


> Check for leadership with leader session id
> ---
>
> Key: FLINK-9573
> URL: https://issues.apache.org/jira/browse/FLINK-9573
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> In order to check whether a {{LeaderContender}} is still the leader, it is 
> not sufficient to simply provide a {{LeaderElectionService#hasLeadership()}}. 
> Instead, we should extend this method to also take the leader session id as a 
> parameter to distinguish between different calls from the same leader 
> contender with different leader session ids.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6154: [FLINK-9573] Extend LeaderElectionService#hasLeade...

2018-06-14 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6154#discussion_r195613671
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
 ---
@@ -356,8 +357,8 @@ public void confirmLeaderSessionID(UUID 
leaderSessionID) {
}
 
@Override
-   public boolean hasLeadership() {
-   return isLeader;
+   public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+   return isLeader && 
leaderSessionId.equals(currentLeaderSessionId);
}
--- End diff --

Thanks @tillrohrmann Makes sense better to me now. Yea, I did not see there 
has a volatile in there before. Thanks again.  


---


[jira] [Closed] (FLINK-9590) HistogramDump should be immutable

2018-06-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9590.
---
Resolution: Fixed

master: 24266fd63b72c76e77ed86e46392f7427e4d1f8b

> HistogramDump should be immutable
> -
>
> Key: FLINK-9590
> URL: https://issues.apache.org/jira/browse/FLINK-9590
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.6.0
>
>
> The {{HistogramDump}} represents the contents of a histogram at one point in 
> time, and should thus not be mutable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9591) Remove remnants of distributed-cache logic

2018-06-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9591.
---
Resolution: Fixed

master: 0dc0e36f204401f14e078d24348421cd4140577d

> Remove remnants of distributed-cache logic
> --
>
> Key: FLINK-9591
> URL: https://issues.apache.org/jira/browse/FLINK-9591
> Project: Flink
>  Issue Type: Bug
>  Components: Python API
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> There are still some remnants in the python API for the old distributed cache 
> logic, where we uploaded files first to a DFS before registering them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9589) PythonOperationInfo should be immutable

2018-06-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9589.
---
Resolution: Fixed

master: 2fc6499496296cf2aa2408e4e15d684496a169a2

> PythonOperationInfo should be immutable
> ---
>
> Key: FLINK-9589
> URL: https://issues.apache.org/jira/browse/FLINK-9589
> Project: Flink
>  Issue Type: Bug
>  Components: Python API
>Affects Versions: 1.2.1, 1.3.3, 1.5.0, 1.4.2, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> The {{PythonOperationInfo}} is a simple represantation of a dataset operation 
> defined by the python plan. Thus the entire object should be immutable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8744) Add annotations for documenting common/advanced options

2018-06-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8744.
---
Resolution: Fixed

master: eaff4da15bdd7528dcc0d8a37fd59642cee53850

> Add annotations for documenting common/advanced options
> ---
>
> Key: FLINK-8744
> URL: https://issues.apache.org/jira/browse/FLINK-8744
> Project: Flink
>  Issue Type: New Feature
>  Components: Configuration, Documentation
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.6.0
>
>
> The {{ConfigDocsGenerator}} only generates [the full configuration 
> reference|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#full-reference].
>  The 
> [common|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#common-options]
>  and 
> [advanced|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#advanced-options]
>  sections are still manually defined in {{config.md}} and thus prone to being 
> outdated / out-of-sync with the full reference.
> I propose adding {{@Common}}/{{@Advanced}} annotations based on which we 
> could generate these sections as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9530) Task numRecords metrics broken for chains

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513144#comment-16513144
 ] 

ASF GitHub Bot commented on FLINK-9530:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6126


> Task numRecords metrics broken for chains
> -
>
> Key: FLINK-9530
> URL: https://issues.apache.org/jira/browse/FLINK-9530
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> The {{numRecordsIn/Out}} metrics for tasks is currently broken. We are 
> wrongly adding up the numRecordsIn/Out metrics for all operators in the 
> chain, instead of just the head/tail operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8744) Add annotations for documenting common/advanced options

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513146#comment-16513146
 ] 

ASF GitHub Bot commented on FLINK-8744:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5843


> Add annotations for documenting common/advanced options
> ---
>
> Key: FLINK-8744
> URL: https://issues.apache.org/jira/browse/FLINK-8744
> Project: Flink
>  Issue Type: New Feature
>  Components: Configuration, Documentation
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.6.0
>
>
> The {{ConfigDocsGenerator}} only generates [the full configuration 
> reference|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#full-reference].
>  The 
> [common|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#common-options]
>  and 
> [advanced|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#advanced-options]
>  sections are still manually defined in {{config.md}} and thus prone to being 
> outdated / out-of-sync with the full reference.
> I propose adding {{@Common}}/{{@Advanced}} annotations based on which we 
> could generate these sections as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9257) End-to-end tests prints "All tests PASS" even if individual test-script returns non-zero exit code

2018-06-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9257.
---
   Resolution: Fixed
Fix Version/s: 1.5.1

master:45ac85e2c4ec66ff02e4277c1781247710252a26

1.5: 0796ac7e212f8d6197e5db95b1a85417767b247c

> End-to-end tests prints "All tests PASS" even if individual test-script 
> returns non-zero exit code
> --
>
> Key: FLINK-9257
> URL: https://issues.apache.org/jira/browse/FLINK-9257
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> In some cases the test-suite exits with non-zero exit code but still prints 
> "All tests PASS" to stdout. This happens because how the test runner works, 
> which is roughly as follows
>  # Either run-nightly-tests.sh or run-precommit-tests.sh executes a suite of 
> tests consisting of one multiple bash scripts.
>  # As soon as one of those bash scripts exists with non-zero exit code, the 
> tests won't continue to run and the test-suite will also exit with non-zero 
> exit code.
>  # *During the cleanup hook (trap cleanup EXIT in common.sh) it will be 
> checked whether there are non-empty out files or log files with certain 
> exceptions. If a tests fails with non-zero exit code, but does not have any 
> exceptions or .out files, this will still print "All tests PASS" to stdout, 
> even though they don't*
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9257) End-to-end tests prints "All tests PASS" even if individual test-script returns non-zero exit code

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513145#comment-16513145
 ] 

ASF GitHub Bot commented on FLINK-9257:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6053


> End-to-end tests prints "All tests PASS" even if individual test-script 
> returns non-zero exit code
> --
>
> Key: FLINK-9257
> URL: https://issues.apache.org/jira/browse/FLINK-9257
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> In some cases the test-suite exits with non-zero exit code but still prints 
> "All tests PASS" to stdout. This happens because how the test runner works, 
> which is roughly as follows
>  # Either run-nightly-tests.sh or run-precommit-tests.sh executes a suite of 
> tests consisting of one multiple bash scripts.
>  # As soon as one of those bash scripts exists with non-zero exit code, the 
> tests won't continue to run and the test-suite will also exit with non-zero 
> exit code.
>  # *During the cleanup hook (trap cleanup EXIT in common.sh) it will be 
> checked whether there are non-empty out files or log files with certain 
> exceptions. If a tests fails with non-zero exit code, but does not have any 
> exceptions or .out files, this will still print "All tests PASS" to stdout, 
> even though they don't*
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6126: [FLINK-9530][metrics] Fix numRecords task metric f...

2018-06-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6126


---


[GitHub] flink pull request #5843: [FLINK-8744][docs] Generate "Common Option" sectio...

2018-06-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5843


---


[GitHub] flink pull request #6053: [FLINK-9257][E2E Tests] Fix wrong "All tests pass"...

2018-06-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6053


---


[jira] [Closed] (FLINK-9530) Task numRecords metrics broken for chains

2018-06-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9530.
---
Resolution: Fixed

master: 03d77b862b845fc2b2836fc35fdbc8793d5064b7

1.5: 246ef1d291ef8ba6907c368e1cee42d20a5e617a

> Task numRecords metrics broken for chains
> -
>
> Key: FLINK-9530
> URL: https://issues.apache.org/jira/browse/FLINK-9530
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> The {{numRecordsIn/Out}} metrics for tasks is currently broken. We are 
> wrongly adding up the numRecordsIn/Out metrics for all operators in the 
> chain, instead of just the head/tail operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9563) Migrate integration tests for CEP

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513016#comment-16513016
 ] 

ASF GitHub Bot commented on FLINK-9563:
---

Github user deepaks4077 commented on the issue:

https://github.com/apache/flink/pull/6170
  
@zentol, thanks, will do!


> Migrate integration tests for CEP
> -
>
> Key: FLINK-9563
> URL: https://issues.apache.org/jira/browse/FLINK-9563
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Deepak Sharma
>Assignee: Deepak Sharma
>Priority: Minor
>
> Covers all integration tests under
> apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...

2018-06-14 Thread deepaks4077
Github user deepaks4077 commented on the issue:

https://github.com/apache/flink/pull/6170
  
@zentol, thanks, will do!


---


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513012#comment-16513012
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6083
  
@tillrohrmann Thanks for the review! I will go through them and will make 
the changes shortly.


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6083: [FLINK-8983] End-to-end test: Confluent schema registry

2018-06-14 Thread medcv
Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6083
  
@tillrohrmann Thanks for the review! I will go through them and will make 
the changes shortly.


---


[jira] [Commented] (FLINK-9563) Migrate integration tests for CEP

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512985#comment-16512985
 ] 

ASF GitHub Bot commented on FLINK-9563:
---

GitHub user deepaks4077 opened a pull request:

https://github.com/apache/flink/pull/6170

[FLINK-9563]: Using a custom sink function for tests in CEPITCase instead 
of writing to disk

## What is the purpose of the change

This change modifies the CEPITCase integration test to use a custom sink 
function to collect and compare test results, instead of writing them to a 
file. It does not add/remove any constituent tests.

## Brief change log

- Removed Before and After junit annotations
- Added a custom sink function with a static arraylist to collect and 
compare test results

## Verifying this change

This change is already covered by existing tests, such as CEPITCase, which 
is the end to end test of the CEP API.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/deepaks4077/flink FLINK-9563

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6170.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6170


commit 8fc629a557af4a56ab7638cc5eb519e163267cdc
Author: Deepak Sharnma 
Date:   2018-06-13T02:41:27Z

[FLINK-9563]: Using a custom sink function for tests in CEPITCase




> Migrate integration tests for CEP
> -
>
> Key: FLINK-9563
> URL: https://issues.apache.org/jira/browse/FLINK-9563
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Deepak Sharma
>Assignee: Deepak Sharma
>Priority: Minor
>
> Covers all integration tests under
> apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

2018-06-14 Thread deepaks4077
GitHub user deepaks4077 opened a pull request:

https://github.com/apache/flink/pull/6170

[FLINK-9563]: Using a custom sink function for tests in CEPITCase instead 
of writing to disk

## What is the purpose of the change

This change modifies the CEPITCase integration test to use a custom sink 
function to collect and compare test results, instead of writing them to a 
file. It does not add/remove any constituent tests.

## Brief change log

- Removed Before and After junit annotations
- Added a custom sink function with a static arraylist to collect and 
compare test results

## Verifying this change

This change is already covered by existing tests, such as CEPITCase, which 
is the end to end test of the CEP API.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/deepaks4077/flink FLINK-9563

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6170.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6170


commit 8fc629a557af4a56ab7638cc5eb519e163267cdc
Author: Deepak Sharnma 
Date:   2018-06-13T02:41:27Z

[FLINK-9563]: Using a custom sink function for tests in CEPITCase




---


[jira] [Updated] (FLINK-9592) Notify on moving file into pending/ final state

2018-06-14 Thread Rinat Sharipov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-9592:
--
Description: 
Hi mates, I got a proposal about functionality of BucketingSink.
  
 During implementation of one of our tasks we got the following need - create a 
meta-file, with the path and additional information about the file, created by 
BucketingSink, when it’s been moved into final place.
 Unfortunately such behaviour is currently not available for us. 
  
 We’ve implemented our own Sink, that provides an opportunity to register 
notifiers, that will be called, when file state is changing, but current API 
doesn’t allow us to add such behaviour using inheritance ...
  
 It seems, that such functionality could be useful, and could be a part of 
BucketingSink API
 What do you think, should I make a PR ?

Sincerely yours,
 *Rinat Sharipov*
 Software Engineer at 1DMP CORE Team
  
 email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
 mobile: +7 (925) 416-37-26
 Clever{color:#4f8f00}DATA{color}
 make your data clever
  
 

  
 Hi,
 I see that could be a useful feature. What exactly now is preventing you from 
inheriting from BucketingSink? Maybe it would be just enough to make the 
BucketingSink easier extendable.
 One thing now that could collide with such feature is that Kostas is now 
working on larger BucketingSink rework/refactor. 
 Piotrek
 
  
 Hi guys, thx for your reply. 
 The following code info is actual for *release-1.5.0 tag, 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
  
 For now, BucketingSink has the following lifecycle of files
  
 When moving files from opened to pending state:
 # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
exist, and contain opened file, in case, when opened file doesn’t exist, we 
create one, and write item to it
 # on each item (*method* *invoke:434* *line*), we check that suitable opened 
file doesn’t exceed the limits, and if limits are exceeded, we close it and 
move into pending state using *closeCurrentPartFile:568 line - private method*
 # on each timer request (*onProcessingTime:482 line*), we check, if items 
haven't been added to the opened file longer, than specified period of time, we 
close it, using the same private method *closeCurrentPartFile:588 line*

 
 So, the only way, that we have, is to call our hook from 
*closeCurrentPartFile*, that is private, so we copy-pasted the current impl and 
injected our logic there
  
  
 Files are moving from pending state into final, during checkpointing 
lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and contains 
a lot of logic, including discovery of files in pending states, synchronization 
of state access and it’s modification, etc … 
  
 So we couldn’t override it, or call super method and add some logic, because 
when current impl changes the state of files, it removes them from state, and 
we don’t have any opportunity to know, 
 for which files state have been changed.
  
 To solve such problem, we've created the following interface
  
 /**
 * The \{@code FileStateChangeCallback}is used to perform any additional 
operations, when
{@link BucketingSink}
 * moves file from one state to another. For more information about state 
management of \{@code BucketingSink}, look

 * through it's official documentation.
 */
 public interface FileStateChangeCallback extends Serializable \{ /** * Used to 
perform any additional operations, related with moving of file into next state. 
* * @param fs provides access for working with file system * @param path path 
to the file, moved into next state * * @throws IOException if something went 
wrong, while performing any operations with file system */ void call(FileSystem 
fs, Path path) throws IOException; }
And have added an ability to register this callbacks in BucketingSink impl in 
the following manner
  
 public BucketingSink 
registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...}
 public BucketingSink 
registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks) 
\{...}
  
 I’m ready to discuss the best ways, how such hooks could be implemented in the 
core impl or any other improvements, that will help us to add such 
functionality into our extension, using public api, instead of copy-pasting the 
source code.
  
 Thx for your help, mates =)
  [*See More* from Piotr Nowojski|x-redundant-cluster-toggle://0]
  

Sincerely yours,
 *Rinat Sharipov*
 Software Engineer at 1DMP CORE Team
  
 email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
 mobile: +7 (925) 416-37-26
 Clever{color:#4f8f00}DATA{color}
 make your data clever
  
 ___

[jira] [Updated] (FLINK-9592) Notify on moving file into pending/ final state

2018-06-14 Thread Mikhail Pryakhin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mikhail Pryakhin updated FLINK-9592:

Description: 
Hi mates, I got a proposal about functionality of BucketingSink.
 
During implementation of one of our tasks we got the following need - create a 
meta-file, with the path and additional information about the file, created by 
BucketingSink, when it’s been moved into final place.
Unfortunately such behaviour is currently not available for us. 
 
We’ve implemented our own Sink, that provides an opportunity to register 
notifiers, that will be called, when file state is changing, but current API 
doesn’t allow us to add such behaviour using inheritance ...
 
It seems, that such functionality could be useful, and could be a part of 
BucketingSink API
What do you sink, should I make a PR ?

Sincerely yours,
 *Rinat Sharipov*
Software Engineer at 1DMP CORE Team
 
email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
mobile: +7 (925) 416-37-26
Clever{color:#4f8f00}DATA{color}
make your data clever
 

 
Hi,
I see that could be a useful feature. What exactly now is preventing you from 
inheriting from BucketingSink? Maybe it would be just enough to make the 
BucketingSink easier extendable.
One thing now that could collide with such feature is that Kostas is now 
working on larger BucketingSink rework/refactor. 
Piotrek

 
Hi guys, thx for your reply. 
The following code info is actual for *release-1.5.0 tag, 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
 
For now, BucketingSink has the following lifecycle of files
 
When moving files from opened to pending state:
 # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
exist, and contain opened file, in case, when opened file doesn’t exist, we 
create one, and write item to it
 # on each item (*method* *invoke:434* *line*), we check that suitable opened 
file doesn’t exceed the limits, and if limits are exceeded, we close it and 
move into pending state using *closeCurrentPartFile:568 line - private method*
 # on each timer request (*onProcessingTime:482 line*), we check, if items 
haven't been added to the opened file longer, than specified period of time, we 
close it, using the same private method *closeCurrentPartFile:588 line*

 
So, the only way, that we have, is to call our hook from 
*closeCurrentPartFile*, that is private, so we copy-pasted the current impl and 
injected our logic there
 
 
Files are moving from pending state into final, during checkpointing lifecycle, 
in *notifyCheckpointComplete:657 line*, that is public, and contains a lot of 
logic, including discovery of files in pending states, synchronization of state 
access and it’s modification, etc … 
 
So we couldn’t override it, or call super method and add some logic, because 
when current impl changes the state of files, it removes them from state, and 
we don’t have any opportunity to know, 
for which files state have been changed.
 
To solve such problem, we've created the following interface
 
/**
 * The \{@code FileStateChangeCallback}is used to perform any additional 
operations, when
{@link BucketingSink}
 * moves file from one state to another. For more information about state 
management of \{@code BucketingSink}, look

 * through it's official documentation.
*/
public interface FileStateChangeCallback extends Serializable \{ /** * Used to 
perform any additional operations, related with moving of file into next state. 
* * @param fs provides access for working with file system * @param path path 
to the file, moved into next state * * @throws IOException if something went 
wrong, while performing any operations with file system */ void call(FileSystem 
fs, Path path) throws IOException; }
And have added an ability to register this callbacks in BucketingSink impl in 
the following manner
 
public BucketingSink 
registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...}
public BucketingSink 
registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks) 
\{...}
 
I’m ready to discuss the best ways, how such hooks could be implemented in the 
core impl or any other improvements, that will help us to add such 
functionality into our extension, using public api, instead of copy-pasting the 
source code.
 
Thx for your help, mates =)
 [*See More* from Piotr Nowojski|x-redundant-cluster-toggle://0]
 

Sincerely yours,
 *Rinat Sharipov*
Software Engineer at 1DMP CORE Team
 
email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
mobile: +7 (925) 416-37-26
Clever{color:#4f8f00}DATA{color}
make your data clever
 

[jira] [Created] (FLINK-9592) Notify on moving into pending/ final state

2018-06-14 Thread Rinat Sharipov (JIRA)
Rinat Sharipov created FLINK-9592:
-

 Summary: Notify on moving into pending/ final state
 Key: FLINK-9592
 URL: https://issues.apache.org/jira/browse/FLINK-9592
 Project: Flink
  Issue Type: New Feature
  Components: filesystem-connector
Reporter: Rinat Sharipov


Hi mates, I got a proposal about functionality of BucketingSink.
 
During implementation of one of our tasks we got the following need - create a 
meta-file, with the path and additional information about the file, created by 
BucketingSink, when it’s been moved into final place.
Unfortunately such behaviour is currently not available for us. 
 
We’ve implemented our own Sink, that provides an opportunity to register 
notifiers, that will be called, when file state is changing, but current API 
doesn’t allow us to add such behaviour using inheritance ...
 
It seems, that such functionality could be useful, and could be a part of 
BucketingSink API
What do you sink, should I make a PR ?


Sincerely yours,
*Rinat Sharipov*
Software Engineer at 1DMP CORE Team
 
email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
mobile: +7 (925) 416-37-26
Clever{color:#4f8f00}DATA{color}
make your data clever
 

 
Hi,
I see that could be a useful feature. What exactly now is preventing you from 
inheriting from BucketingSink? Maybe it would be just enough to make the 
BucketingSink easier extendable.
One thing now that could collide with such feature is that Kostas is now 
working on larger BucketingSink rework/refactor. 
Piotrek

 
Hi guys, thx for your reply. 
The following code info is actual for *release-1.5.0 tag, 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
 
For now, BucketingSink has the following lifecycle of files
 
When moving files from opened to pending state:
 # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
exist, and contain opened file, in case, when opened file doesn’t exist, we 
create one, and write item to it
 # on each item (*method* *invoke:434* *line*), we check that suitable opened 
file doesn’t exceed the limits, and if limits are exceeded, we close it and 
move into pending state using *closeCurrentPartFile:568 line - private method*
 # on each timer request (*onProcessingTime:482 line*), we check, if items 
haven't been added to the opened file longer, than specified period of time, we 
close it, using the same private method *closeCurrentPartFile:588 line*

 
So, the only way, that we have, is to call our hook from 
*closeCurrentPartFile*, that is private, so we copy-pasted the current impl and 
injected our logic there
 
 
Files are moving from pending state into final, during checkpointing lifecycle, 
in *notifyCheckpointComplete:657 line*, that is public, and contains a lot of 
logic, including discovery of files in pending states, synchronization of state 
access and it’s modification, etc … 
 
So we couldn’t override it, or call super method and add some logic, because 
when current impl changes the state of files, it removes them from state, and 
we don’t have any opportunity to know, 
for which files state have been changed.
 
To solve such problem, we've created the following interface
 
/**
 * The {@code FileStateChangeCallback} is used to perform any additional 
operations, when {@link BucketingSink}
 * moves file from one state to another. For more information about state 
management of {@code BucketingSink}, look
 * through it's official documentation.
 */
public interface FileStateChangeCallback extends Serializable {

 /**
 * Used to perform any additional operations, related with moving of file into 
next state.
 *
 * @param fs provides access for working with file system
 * @param path path to the file, moved into next state
 *
 * @throws IOException if something went wrong, while performing any operations 
with file system
 */
 void call(FileSystem fs, Path path) throws IOException;
}
And have added an ability to register this callbacks in BucketingSink impl in 
the following manner
 
public BucketingSink 
registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...}
public BucketingSink 
registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks) 
\{...}
 
I’m ready to discuss the best ways, how such hooks could be implemented in the 
core impl or any other improvements, that will help us to add such 
functionality into our extension, using public api, instead of copy-pasting the 
source code.
 
Thx for your help, mates =)
 [*See More* from Piotr Nowojski|x-redundant-cluster-toggle://0]
 


Sincerely yours,
*Rinat Sharipov*
Software Engineer at 1DMP CORE Team
 
email: [r.shari...@cleverdata.ru|mailto:a.totma...@

[jira] [Updated] (FLINK-9592) Notify on moving file into pending/ final state

2018-06-14 Thread Rinat Sharipov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-9592:
--
Summary: Notify on moving file into pending/ final state  (was: Notify on 
moving into pending/ final state)

> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Priority: Major
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
> *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The {@code FileStateChangeCallback} is used to perform any additional 
> operations, when {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of {@code BucketingSink}, look
>  * through it's official documentation.
>  */
> public interface FileStateChangeCallback extends Serializable {
>  /**
>  * Used to perform any additional operations, related with moving of file 
> into next state.
>  *
>  * @param fs provides access for working with file system
>  * @param path path to the file, moved into next state
>  *
>  * @throws IOException if something went wrong, while performing any 
> operations with file system
>  */
>  void call(FileSystem fs, Path path) throws IOException;
> }
> And have added an ability to register this callbacks in BucketingSink impl in 
> the following manner
>  
> public BucketingSink 
> registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...}
> public BucketingSink 
> registerOnPendingStateChangeCallback(FileStateChange

[jira] [Commented] (FLINK-9380) Failing end-to-end tests should not clean up logs

2018-06-14 Thread Deepak Sharma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512898#comment-16512898
 ] 

Deepak Sharma commented on FLINK-9380:
--

Hi [~till.rohrmann], would mind giving examples? I might be able to take a look.

> Failing end-to-end tests should not clean up logs
> -
>
> Key: FLINK-9380
> URL: https://issues.apache.org/jira/browse/FLINK-9380
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> Some of the end-to-end tests clean up their logs also in the failure case. 
> This makes debugging and understanding the problem extremely difficult. 
> Ideally, the scripts says where it stored the respective logs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6149: [FLINK-9560] Add RateLimiting for FileSystem

2018-06-14 Thread etiennecarriere
Github user etiennecarriere commented on the issue:

https://github.com/apache/flink/pull/6149
  
@pnowojski , I add a unit test which validate the rate limiting feature but 
: 
* It add 20s to the unit test . Would it be better to move to Integration 
Test (even if it is not an Integration Test)
* I take 10 seconds for each test to have some stability

Open to your suggestions. 


---


[jira] [Commented] (FLINK-9560) RateLimiting for FileSystem

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512891#comment-16512891
 ] 

ASF GitHub Bot commented on FLINK-9560:
---

Github user etiennecarriere commented on the issue:

https://github.com/apache/flink/pull/6149
  
@pnowojski , I add a unit test which validate the rate limiting feature but 
: 
* It add 20s to the unit test . Would it be better to move to Integration 
Test (even if it is not an Integration Test)
* I take 10 seconds for each test to have some stability

Open to your suggestions. 


> RateLimiting for FileSystem
> ---
>
> Key: FLINK-9560
> URL: https://issues.apache.org/jira/browse/FLINK-9560
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0
>Reporter: Etienne CARRIERE
>Priority: Major
>
> *Pain*: On our system, we see that during checkpoint , all the bandwidth is 
> take to send the checkpoint to object storage (s3 in our case)
> *Proposal* : After the creation of some limitation on Filesystem (mostly 
> number of connections with the  tickets FLINK-8125/FLINK-8198/FLINK-9468), I 
> propose to add ratelimiting "per Filesystem" .
> *Proposal of implementation* : Modify LimitedConnectionsFileSystem to add a 
> ratelimiter on both Input and OutputStream.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9168) Pulsar Sink Connector

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512865#comment-16512865
 ] 

ASF GitHub Bot commented on FLINK-9168:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5845
  
it is not possible to create a PR against a non-existing branch. We will 
either have to create a pulsar-connector branch up front, or open the first PR 
against master and merge it into a new branch. I would suggest the latter 
option.


> Pulsar Sink Connector
> -
>
> Key: FLINK-9168
> URL: https://issues.apache.org/jira/browse/FLINK-9168
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Zongyang Xiao
>Priority: Minor
> Fix For: 1.6.0
>
>
> Flink does not provide a sink connector for Pulsar.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector

2018-06-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5845
  
it is not possible to create a PR against a non-existing branch. We will 
either have to create a pulsar-connector branch up front, or open the first PR 
against master and merge it into a new branch. I would suggest the latter 
option.


---


[jira] [Assigned] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder

2018-06-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-5860:
---

Assignee: Mahesh Senniappan  (was: Yaroslav Mykhaylov)

> Replace all the file creating from java.io.tmpdir with TemporaryFolder
> --
>
> Key: FLINK-5860
> URL: https://issues.apache.org/jira/browse/FLINK-5860
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: shijinkui
>Assignee: Mahesh Senniappan
>Priority: Major
>  Labels: starter
>
> Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will 
> get a  Unit test list. Replace all the file creating from `java.io.tmpdir` 
> with TemporaryFolder.
> Who can fix this problem thoroughly?
> ```
> $ grep -ri 'System.getProperty("java.io.tmpdir")' .
> ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java:
>   env.setStateBackend(new FsStateBackend("file:///" + 
> System.getProperty("java.io.tmpdir") + "/flink/backend"));
> ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
>  return getMockEnvironment(new File[] { new 
> File(System.getProperty("java.io.tmpdir")) });
> ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java:
>public static final String DEFAULT_TASK_MANAGER_TMP_PATH = 
> System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java:
>   final String tempPath = System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java:   
> final File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java:
>   final String outDir = params.get("output", 
> System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java:
> final String tmpDir = System.getProperty("java.io.tmpdir");
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java:
>   final String outPath = System.getProperty("java.io.tmpdir");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>public static final String FLINK_PYTHON_FILE_PATH = 
> System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>public static final String FLINK_TMP_DATA_DIR = 
> System.getProperty("java.io.tmpdir") + File.separator + "flink_data";
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>FLINK_HDFS

[jira] [Commented] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder

2018-06-14 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512860#comment-16512860
 ] 

Chesnay Schepler commented on FLINK-5860:
-

I'll assign it to you.

> Replace all the file creating from java.io.tmpdir with TemporaryFolder
> --
>
> Key: FLINK-5860
> URL: https://issues.apache.org/jira/browse/FLINK-5860
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: shijinkui
>Assignee: Yaroslav Mykhaylov
>Priority: Major
>  Labels: starter
>
> Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will 
> get a  Unit test list. Replace all the file creating from `java.io.tmpdir` 
> with TemporaryFolder.
> Who can fix this problem thoroughly?
> ```
> $ grep -ri 'System.getProperty("java.io.tmpdir")' .
> ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java:
>   env.setStateBackend(new FsStateBackend("file:///" + 
> System.getProperty("java.io.tmpdir") + "/flink/backend"));
> ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
>  return getMockEnvironment(new File[] { new 
> File(System.getProperty("java.io.tmpdir")) });
> ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java:
>public static final String DEFAULT_TASK_MANAGER_TMP_PATH = 
> System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java:
>   final String tempPath = System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java:   
> final File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java:
>   final String outDir = params.get("output", 
> System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java:
> final String tmpDir = System.getProperty("java.io.tmpdir");
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java:
>   final String outPath = System.getProperty("java.io.tmpdir");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>public static final String FLINK_PYTHON_FILE_PATH = 
> System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>public static final String FLINK_TMP_DATA_DIR = 
> System.getProperty("java.io.tmpdir") + File.separator + "flink_data";
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>  

[jira] [Commented] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder

2018-06-14 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512859#comment-16512859
 ] 

Chesnay Schepler commented on FLINK-5860:
-

[~maheshsenni] yes i believe this issue is still valid.

> Replace all the file creating from java.io.tmpdir with TemporaryFolder
> --
>
> Key: FLINK-5860
> URL: https://issues.apache.org/jira/browse/FLINK-5860
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: shijinkui
>Assignee: Yaroslav Mykhaylov
>Priority: Major
>  Labels: starter
>
> Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will 
> get a  Unit test list. Replace all the file creating from `java.io.tmpdir` 
> with TemporaryFolder.
> Who can fix this problem thoroughly?
> ```
> $ grep -ri 'System.getProperty("java.io.tmpdir")' .
> ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java:
>   env.setStateBackend(new FsStateBackend("file:///" + 
> System.getProperty("java.io.tmpdir") + "/flink/backend"));
> ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
>  return getMockEnvironment(new File[] { new 
> File(System.getProperty("java.io.tmpdir")) });
> ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java:
>public static final String DEFAULT_TASK_MANAGER_TMP_PATH = 
> System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java:
>   final String tempPath = System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java:   
> final File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java:
>   final String outDir = params.get("output", 
> System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java:
> final String tmpDir = System.getProperty("java.io.tmpdir");
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java:
>   final String outPath = System.getProperty("java.io.tmpdir");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>public static final String FLINK_PYTHON_FILE_PATH = 
> System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>public static final String FLINK_TMP_DATA_DIR = 
> System.getProperty("java.io.tmpdir") + File.separator + "flink_data";
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.

[jira] [Comment Edited] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder

2018-06-14 Thread Mahesh Senniappan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16502481#comment-16502481
 ] 

Mahesh Senniappan edited comment on FLINK-5860 at 6/14/18 6:35 PM:
---

Hello [~fhue...@gmail.com],

Is this still available to work? I would like to pick this up if it is 
available.

Edit: Added Fabian for visibility.


was (Author: maheshsenni):
Hello, is there someone working on this? If not, would you mind if I have a go 
at this?

> Replace all the file creating from java.io.tmpdir with TemporaryFolder
> --
>
> Key: FLINK-5860
> URL: https://issues.apache.org/jira/browse/FLINK-5860
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: shijinkui
>Assignee: Yaroslav Mykhaylov
>Priority: Major
>  Labels: starter
>
> Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will 
> get a  Unit test list. Replace all the file creating from `java.io.tmpdir` 
> with TemporaryFolder.
> Who can fix this problem thoroughly?
> ```
> $ grep -ri 'System.getProperty("java.io.tmpdir")' .
> ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java:
>   env.setStateBackend(new FsStateBackend("file:///" + 
> System.getProperty("java.io.tmpdir") + "/flink/backend"));
> ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
>  return getMockEnvironment(new File[] { new 
> File(System.getProperty("java.io.tmpdir")) });
> ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java:
>public static final String DEFAULT_TASK_MANAGER_TMP_PATH = 
> System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java:
>   final String tempPath = System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java:   
> final File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java:
>   final String outDir = params.get("output", 
> System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java:
> final String tmpDir = System.getProperty("java.io.tmpdir");
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java:
>   final String outPath = System.getProperty("java.io.tmpdir");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>public static final String FLINK_PYTHON_FILE_PATH = 
> System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
> ./flink-libraries/flink-python/src/main/java/org/apache/f

[jira] [Created] (FLINK-9591) Remove remnants of distributed-cache logic

2018-06-14 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9591:
---

 Summary: Remove remnants of distributed-cache logic
 Key: FLINK-9591
 URL: https://issues.apache.org/jira/browse/FLINK-9591
 Project: Flink
  Issue Type: Bug
  Components: Python API
Affects Versions: 1.6.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.6.0


There are still some remnants in the python API for the old distributed cache 
logic, where we uploaded files first to a DFS before registering them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5843: [FLINK-8744][docs] Generate "Common Option" section

2018-06-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5843
  
merging.


---


[jira] [Commented] (FLINK-8744) Add annotations for documenting common/advanced options

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512814#comment-16512814
 ] 

ASF GitHub Bot commented on FLINK-8744:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5843
  
merging.


> Add annotations for documenting common/advanced options
> ---
>
> Key: FLINK-8744
> URL: https://issues.apache.org/jira/browse/FLINK-8744
> Project: Flink
>  Issue Type: New Feature
>  Components: Configuration, Documentation
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.6.0
>
>
> The {{ConfigDocsGenerator}} only generates [the full configuration 
> reference|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#full-reference].
>  The 
> [common|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#common-options]
>  and 
> [advanced|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#advanced-options]
>  sections are still manually defined in {{config.md}} and thus prone to being 
> outdated / out-of-sync with the full reference.
> I propose adding {{@Common}}/{{@Advanced}} annotations based on which we 
> could generate these sections as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9530) Task numRecords metrics broken for chains

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512801#comment-16512801
 ] 

ASF GitHub Bot commented on FLINK-9530:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6126
  
merging.


> Task numRecords metrics broken for chains
> -
>
> Key: FLINK-9530
> URL: https://issues.apache.org/jira/browse/FLINK-9530
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> The {{numRecordsIn/Out}} metrics for tasks is currently broken. We are 
> wrongly adding up the numRecordsIn/Out metrics for all operators in the 
> chain, instead of just the head/tail operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6126: [FLINK-9530][metrics] Fix numRecords task metric for chai...

2018-06-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6126
  
merging.


---


[jira] [Commented] (FLINK-9257) End-to-end tests prints "All tests PASS" even if individual test-script returns non-zero exit code

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512796#comment-16512796
 ] 

ASF GitHub Bot commented on FLINK-9257:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6053
  
merging.


> End-to-end tests prints "All tests PASS" even if individual test-script 
> returns non-zero exit code
> --
>
> Key: FLINK-9257
> URL: https://issues.apache.org/jira/browse/FLINK-9257
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Critical
> Fix For: 1.6.0
>
>
> In some cases the test-suite exits with non-zero exit code but still prints 
> "All tests PASS" to stdout. This happens because how the test runner works, 
> which is roughly as follows
>  # Either run-nightly-tests.sh or run-precommit-tests.sh executes a suite of 
> tests consisting of one multiple bash scripts.
>  # As soon as one of those bash scripts exists with non-zero exit code, the 
> tests won't continue to run and the test-suite will also exit with non-zero 
> exit code.
>  # *During the cleanup hook (trap cleanup EXIT in common.sh) it will be 
> checked whether there are non-empty out files or log files with certain 
> exceptions. If a tests fails with non-zero exit code, but does not have any 
> exceptions or .out files, this will still print "All tests PASS" to stdout, 
> even though they don't*
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6053: [FLINK-9257][E2E Tests] Fix wrong "All tests pass" messag...

2018-06-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6053
  
merging.


---


[jira] [Created] (FLINK-9590) HistogramDump should be immutable

2018-06-14 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9590:
---

 Summary: HistogramDump should be immutable
 Key: FLINK-9590
 URL: https://issues.apache.org/jira/browse/FLINK-9590
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.5.0, 1.6.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.6.0


The {{HistogramDump}} represents the contents of a histogram at one point in 
time, and should thus not be mutable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9168) Pulsar Sink Connector

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512699#comment-16512699
 ] 

ASF GitHub Bot commented on FLINK-9168:
---

Github user XiaoZYang commented on the issue:

https://github.com/apache/flink/pull/5845
  
@hsaputra There is no error but the issue is 
 1. when I create a PR to the flink repo from my fork repo I need to choose 
a branch to which the commits are merged to
2. I'm expected to merge the commits are merged to a branch name like 
'pulsar-connector', but I can't create a new branch at flink repo
Is there a better way to do that?

Thank for you helping me !


> Pulsar Sink Connector
> -
>
> Key: FLINK-9168
> URL: https://issues.apache.org/jira/browse/FLINK-9168
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Zongyang Xiao
>Priority: Minor
> Fix For: 1.6.0
>
>
> Flink does not provide a sink connector for Pulsar.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector

2018-06-14 Thread XiaoZYang
Github user XiaoZYang commented on the issue:

https://github.com/apache/flink/pull/5845
  
@hsaputra There is no error but the issue is 
 1. when I create a PR to the flink repo from my fork repo I need to choose 
a branch to which the commits are merged to
2. I'm expected to merge the commits are merged to a branch name like 
'pulsar-connector', but I can't create a new branch at flink repo
Is there a better way to do that?

Thank for you helping me !


---


[jira] [Commented] (FLINK-9168) Pulsar Sink Connector

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512689#comment-16512689
 ] 

ASF GitHub Bot commented on FLINK-9168:
---

Github user hsaputra commented on the issue:

https://github.com/apache/flink/pull/5845
  
@XiaoZYang You can close this PR and create new branch to submit new PR 
since you are the creator os this one.
Did you see any error or something preventing you to close this one?


> Pulsar Sink Connector
> -
>
> Key: FLINK-9168
> URL: https://issues.apache.org/jira/browse/FLINK-9168
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Zongyang Xiao
>Priority: Minor
> Fix For: 1.6.0
>
>
> Flink does not provide a sink connector for Pulsar.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector

2018-06-14 Thread hsaputra
Github user hsaputra commented on the issue:

https://github.com/apache/flink/pull/5845
  
@XiaoZYang You can close this PR and create new branch to submit new PR 
since you are the creator os this one.
Did you see any error or something preventing you to close this one?


---


[jira] [Commented] (FLINK-9494) Race condition in Dispatcher with concurrent granting and revoking of leaderhship

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512686#comment-16512686
 ] 

ASF GitHub Bot commented on FLINK-9494:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6155


> Race condition in Dispatcher with concurrent granting and revoking of 
> leaderhship
> -
>
> Key: FLINK-9494
> URL: https://issues.apache.org/jira/browse/FLINK-9494
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The {{Dispatcher}} contains a race condition when an instance is granted 
> leadership and then quickly afterwards gets the leadership revoked. The 
> problem is that we don't check in the recovered jobs future callback that we 
> still have the leadership. This can lead to a corrupted state of the 
> {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9573) Check for leadership with leader session id

2018-06-14 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9573.

   Resolution: Fixed
Fix Version/s: 1.5.1
   1.6.0

Fixed via
1.6.0: 363de6b643689f64564270857f1daf7f6c59257f
1.5.1: 7286d0bc90b23e070a5e91e84325ad6a92f09bfa

> Check for leadership with leader session id
> ---
>
> Key: FLINK-9573
> URL: https://issues.apache.org/jira/browse/FLINK-9573
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> In order to check whether a {{LeaderContender}} is still the leader, it is 
> not sufficient to simply provide a {{LeaderElectionService#hasLeadership()}}. 
> Instead, we should extend this method to also take the leader session id as a 
> parameter to distinguish between different calls from the same leader 
> contender with different leader session ids.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9573) Check for leadership with leader session id

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512687#comment-16512687
 ] 

ASF GitHub Bot commented on FLINK-9573:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6154


> Check for leadership with leader session id
> ---
>
> Key: FLINK-9573
> URL: https://issues.apache.org/jira/browse/FLINK-9573
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> In order to check whether a {{LeaderContender}} is still the leader, it is 
> not sufficient to simply provide a {{LeaderElectionService#hasLeadership()}}. 
> Instead, we should extend this method to also take the leader session id as a 
> parameter to distinguish between different calls from the same leader 
> contender with different leader session ids.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6154: [FLINK-9573] Extend LeaderElectionService#hasLeade...

2018-06-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6154


---


[jira] [Closed] (FLINK-9494) Race condition in Dispatcher with concurrent granting and revoking of leaderhship

2018-06-14 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9494.

Resolution: Fixed

Fixed via
1.6.0: 7e51b90d909c6feaac6ed48140df00372e95a45c
1.5.1: a9787d7fb616d231007b7c82e2f5f526370eddf8

> Race condition in Dispatcher with concurrent granting and revoking of 
> leaderhship
> -
>
> Key: FLINK-9494
> URL: https://issues.apache.org/jira/browse/FLINK-9494
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The {{Dispatcher}} contains a race condition when an instance is granted 
> leadership and then quickly afterwards gets the leadership revoked. The 
> problem is that we don't check in the recovered jobs future callback that we 
> still have the leadership. This can lead to a corrupted state of the 
> {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6155: [FLINK-9494] Fix race condition in Dispatcher with...

2018-06-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6155


---


[jira] [Commented] (FLINK-9168) Pulsar Sink Connector

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512682#comment-16512682
 ] 

ASF GitHub Bot commented on FLINK-9168:
---

Github user XiaoZYang commented on the issue:

https://github.com/apache/flink/pull/5845
  
I prefer to open a new PR and a new branch. But I am not authorized to do 
that. 


> Pulsar Sink Connector
> -
>
> Key: FLINK-9168
> URL: https://issues.apache.org/jira/browse/FLINK-9168
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Zongyang Xiao
>Priority: Minor
> Fix For: 1.6.0
>
>
> Flink does not provide a sink connector for Pulsar.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector

2018-06-14 Thread XiaoZYang
Github user XiaoZYang commented on the issue:

https://github.com/apache/flink/pull/5845
  
I prefer to open a new PR and a new branch. But I am not authorized to do 
that. 


---


[jira] [Created] (FLINK-9589) PythonOperationInfo should be immutable

2018-06-14 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9589:
---

 Summary: PythonOperationInfo should be immutable
 Key: FLINK-9589
 URL: https://issues.apache.org/jira/browse/FLINK-9589
 Project: Flink
  Issue Type: Bug
  Components: Python API
Affects Versions: 1.4.2, 1.5.0, 1.3.3, 1.2.1, 1.6.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.6.0


The {{PythonOperationInfo}} is a simple represantation of a dataset operation 
defined by the python plan. Thus the entire object should be immutable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512652#comment-16512652
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195464823
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-0.8_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   
+   org.apache.avro
+   avro
+   
+   
+   io.confluent
+   kafka-avro-serializer
+   ${confluent.version}
+   
+   
+   
+   tech.allegro.schema.json2avro
+   converter
+   ${json2avro.version}
+   
+   
+
+   
+   
+   
+   build-jar
+   
+   false
+   
+   
+   
+   
+   
src/main/resources/${resource.dir}
+   true
+   
+   
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-shade-plugin
+   2.4.1
+   
+   
+   
+   
package
+   
+   
shade
+   
+   
+   

+   

+   
org.apache.flink:force-shading
+   
com.google.code.findbugs:jsr305
+   
org.slf4j:*
+   

+   

+   

+   

+   

[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512661#comment-16512661
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195471771
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
--- End diff --

Same here with non-serializable fields which should be marked as 
`transient`.


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512668#comment-16512668
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195474165
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
+
+   private String topicName;
+
+   private SchemaRegistryClient schemaRegistryClient;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   private final Class avroType;
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, String topicName) {
--- End diff --

The serialization format should not need to know about the `topicName` 
which is used here for the schema lookup. Better to use `KafkaAvroEncoder` 
which automatically retrieves the schema name.


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512648#comment-16512648
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195447395
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/TestAvroConsumerConfluent.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
+
+import example.avro.User;
+
+import java.util.Properties;
+
+/**
+ * A simple example that shows how to read from and write to Kafka with 
Confluent Schema Registry.
+ * This will read AVRO messages from the input topic, parse them into a 
POJO type via checking the Schema by calling Schema registry.
+ * Then this example publish the POJO type to kafka by converting the POJO 
to AVRO and verifying the schema.
+ * --input-topic test-input --output-topic test-output --bootstrap.servers 
localhost:9092 --zookeeper.connect localhost:2181 --schema-registry-url 
http://localhost:8081 --group.id myconsumer
+ */
+public class TestAvroConsumerConfluent {
+
+   public static void main(String[] args) throws Exception {
+   Properties config = new Properties();
+   // parse input arguments
+   final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+   if (parameterTool.getNumberOfParameters() < 6) {
+   System.out.println("Missing parameters!\n" +
+   "Usage: Kafka --input-topic  
--output-topic  " +
+   "--bootstrap.servers  " +
+   "--zookeeper.connect  " +
+   "--schema-registry-url  --group.id ");
+   return;
+   }
+   config.setProperty("bootstrap.servers", 
parameterTool.getRequired("bootstrap.servers"));
+   config.setProperty("group.id", 
parameterTool.getRequired("group.id"));
+   config.setProperty("zookeeper.connect", 
parameterTool.getRequired("zookeeper.connect"));
+   String schemaRegistryUrl = 
parameterTool.getRequired("schema-registry-url");
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.getConfig().disableSysoutLogging();
+   DataStreamSource input = env
+   .addSource(
+   new FlinkKafkaConsumer010(
+   
parameterTool.getRequired("input-topic"),
+   new 
AvroDeserializationConfluentSchema(User.class, schemaRegistryUrl),
--- End diff --

raw usage of `AvroDeserializationConfluentSchema` better to use 
`AvroDeserializationConfluentSchema<>`. Then we can also remove `User` in 
`FlinkKafkaConsumer010`.


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro ty

[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512649#comment-16512649
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195453836
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
--- End diff --

I think we can replace the two Scala dependencies above with
```

org.apache.flink

flink-streaming-java_${scala.binary.version}
${project.version}

```


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512666#comment-16512666
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195473241
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
+
+   private String topicName;
+
+   private SchemaRegistryClient schemaRegistryClient;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   private final Class avroType;
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, String topicName) {
+   this(avroType, schemaRegistryUrl, 
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, topicName);
+   }
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, int identityMapCapacity, String topicName) {
+   this.schemaRegistryUrl = schemaRegistryUrl;
+   this.identityMapCapacity = identityMapCapacity;
+   this.topicName = topicName;
+   this.avroType = avroType;
+   }
+
+   @Override
+   public byte[] serialize(T obj) {
+   byte[] serializedBytes = null;
+
+   try {
+   if (kafkaAvroSerializer == null) {
+   this.schemaRegistryClient = new 
CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
+   this.kafkaAvroSerializer = new 
KafkaAvroSerializer(schemaRegistryClient);
+   }
+
+   String schema = 
schemaRegistryClient.getLatestSchemaMetadata(topicName + "-value").getSchema();
--- End diff --

Why do we have to make this schema lookup here? Shouldn't the 
`kafkaAvroSerializer` do the job for us?


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512656#comment-16512656
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195465238
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroDeserializationConfluentSchema.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroDecoder;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+import java.io.IOException;
+
+/**
+ * The deserialization schema for the Avro type.
+ */
+public class AvroDeserializationConfluentSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private Class avroType;
+   private final String schemaRegistryUrl;
+   private final int identityMapCapacity;
+   private KafkaAvroDecoder kafkaAvroDecoder;
--- End diff --

Not serializable. Please mark as `transient`.


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512670#comment-16512670
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195477424
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh ---
@@ -0,0 +1,106 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh
+
+function verify_output {
+  local expected=$(printf $1)
+
+  if [[ "$2" != "$expected" ]]; then
+echo "Output from Flink program does not match expected output."
+echo -e "EXPECTED FOR KEY: --$expected--"
+echo -e "ACTUAL: --$2--"
+PASS=""
+exit 1
+  fi
+}
+
+function test_cleanup {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  stop_kafka_cluster
+  stop_confluent_schema_registry
+
+  # revert our modifications to the Flink distribution
+  mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+setup_kafka_dist
+setup_confluent_dist
+
+cd flink-end-to-end-tests/flink-confluent-schema-registry
+mvn clean package -Pbuild-jar -nsu
+
+start_kafka_cluster
+start_confluent_schema_registry
+sleep 5
+
+# modify configuration to use port 8082 for Flink
+cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+sed -i -e "s/web.port: 8081/web.port: 8082/" 
$FLINK_DIR/conf/flink-conf.yaml
+
+TEST_PROGRAM_JAR=target/flink-confluent-schema-registry-1.6-SNAPSHOT.jar
+

+INPUT_MESSAGE_1='{"name":"Alyssa","favoriteNumber":"250","favoriteColor":"green","eventType":"meeting"}'

+INPUT_MESSAGE_2='{"name":"Charlie","favoriteNumber":"10","favoriteColor":"blue","eventType":"meeting"}'

+INPUT_MESSAGE_3='{"name":"Ben","favoriteNumber":"7","favoriteColor":"red","eventType":"meeting"}'

+USER_SCHEMA='{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string","default":""},{"name":"favoriteNumber","type":"string","default":""},{"name":"favoriteColor","type":"string","default":""},{"name":"eventType","type":{"name":"EventType","type":"enum","symbols":["meeting"]}}]}'
+
+curl -X POST \
+  http://localhost:8081/subjects/users-value/versions \
+  -H 'cache-control: no-cache' \
+  -H 'content-type: application/vnd.schemaregistry.v1+json' \
+  -d '{"schema": "{\"namespace\": \"example.avro\",\"type\": 
\"record\",\"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"favoriteNumber\",  \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"favoriteColor\", \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"eventType\",\"type\": {\"name\": 
\"EventType\",\"type\": \"enum\", \"symbols\": [\"meeting\"] }}]}"}'
--- End diff --

Do we have to manually register the schema or does it also work with 
sending the first record to the input topic?


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies th

[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512667#comment-16512667
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195473522
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
+
+   private String topicName;
--- End diff --

Could be final, I guess.


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512650#comment-16512650
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195464245
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-0.8_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   
+   org.apache.avro
+   avro
+   
+   
+   io.confluent
+   kafka-avro-serializer
+   ${confluent.version}
+   
--- End diff --

Why do we need this dependency? Can't we use Flink's Avro serializer?


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512646#comment-16512646
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195463614
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-0.8_${scala.binary.version}
+   ${project.version}
+   
--- End diff --

For what do we need this dependency? Ideally, we should drop it.


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512654#comment-16512654
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195464899
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-0.8_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   
+   org.apache.avro
+   avro
+   
+   
+   io.confluent
+   kafka-avro-serializer
+   ${confluent.version}
+   
+   
+   
+   tech.allegro.schema.json2avro
+   converter
+   ${json2avro.version}
+   
+   
+
+   
+   
+   
+   build-jar
+   
+   false
+   
+   
+   
+   
+   
src/main/resources/${resource.dir}
+   true
+   
+   
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-shade-plugin
+   2.4.1
+   
+   
+   
+   
package
+   
+   
shade
+   
+   
+   

+   

+   
org.apache.flink:force-shading
+   
com.google.code.findbugs:jsr305
+   
org.slf4j:*
+   

+   

+   

+   

+   

[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512659#comment-16512659
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195466309
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroDeserializationConfluentSchema.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroDecoder;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+import java.io.IOException;
+
+/**
+ * The deserialization schema for the Avro type.
+ */
+public class AvroDeserializationConfluentSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private Class avroType;
+   private final String schemaRegistryUrl;
+   private final int identityMapCapacity;
+   private KafkaAvroDecoder kafkaAvroDecoder;
+
+   private ObjectMapper mapper;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   public AvroDeserializationConfluentSchema(Class avroType, String 
schemaRegistyUrl) {
+   this(avroType, schemaRegistyUrl, 
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
+   }
+
+   public AvroDeserializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, int identityMapCapacity) {
+   this.avroType = avroType;
+   this.schemaRegistryUrl = schemaRegistryUrl;
+   this.identityMapCapacity = identityMapCapacity;
+   }
+
+   @Override
+   public T deserialize(byte[] message) throws IOException {
+   if (kafkaAvroDecoder == null) {
+   SchemaRegistryClient schemaRegistryClient = new 
CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
+   this.kafkaAvroDecoder = new 
KafkaAvroDecoder(schemaRegistryClient);
+   }
+   if (mapper == null) {
+   this.mapper = new ObjectMapper();
+   }
+
+   if (jsonAvroConverter == null) {
+   jsonAvroConverter = new JsonAvroConverter();
+   }
+   GenericData.Record record = (GenericData.Record) 
this.kafkaAvroDecoder.fromBytes(message);
+   byte[] messageBytes = jsonAvroConverter.convertToJson(record);
+   return (T) this.mapper.readValue(messageBytes, avroType);
--- End diff --

Why do we have to introduce this indirection with Json? Converting an Avro 
record into Json and then into the specific types seems quite cumbersome. 
Better to directly read the record from the `message`.


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This mess

[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512662#comment-16512662
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195472317
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
+
+   private String topicName;
+
+   private SchemaRegistryClient schemaRegistryClient;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   private final Class avroType;
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, String topicName) {
+   this(avroType, schemaRegistryUrl, 
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, topicName);
+   }
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, int identityMapCapacity, String topicName) {
+   this.schemaRegistryUrl = schemaRegistryUrl;
+   this.identityMapCapacity = identityMapCapacity;
+   this.topicName = topicName;
+   this.avroType = avroType;
+   }
+
+   @Override
+   public byte[] serialize(T obj) {
+   byte[] serializedBytes = null;
+
+   try {
+   if (kafkaAvroSerializer == null) {
+   this.schemaRegistryClient = new 
CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
+   this.kafkaAvroSerializer = new 
KafkaAvroSerializer(schemaRegistryClient);
+   }
+
+   String schema = 
schemaRegistryClient.getLatestSchemaMetadata(topicName + "-value").getSchema();
+
+   if (jsonAvroConverter == null) {
+   jsonAvroConverter = new JsonAvroConverter();
+   }
+
+   //System.out.println("Schema fetched from Schema 
Registry for topic :" + topicName + " = " + schema);
--- End diff --

Remove unused code


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512645#comment-16512645
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195452831
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
--- End diff --

Could be inherited from the parent.


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512658#comment-16512658
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195465492
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroDeserializationConfluentSchema.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroDecoder;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+import java.io.IOException;
+
+/**
+ * The deserialization schema for the Avro type.
+ */
+public class AvroDeserializationConfluentSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private Class avroType;
--- End diff --

`final` would be better


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512669#comment-16512669
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195473423
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
+
+   private String topicName;
+
+   private SchemaRegistryClient schemaRegistryClient;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   private final Class avroType;
--- End diff --

This field is not used.


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512664#comment-16512664
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195472677
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
+
+   private String topicName;
+
+   private SchemaRegistryClient schemaRegistryClient;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   private final Class avroType;
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, String topicName) {
+   this(avroType, schemaRegistryUrl, 
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, topicName);
+   }
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, int identityMapCapacity, String topicName) {
+   this.schemaRegistryUrl = schemaRegistryUrl;
+   this.identityMapCapacity = identityMapCapacity;
+   this.topicName = topicName;
+   this.avroType = avroType;
+   }
+
+   @Override
+   public byte[] serialize(T obj) {
+   byte[] serializedBytes = null;
+
+   try {
+   if (kafkaAvroSerializer == null) {
+   this.schemaRegistryClient = new 
CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
+   this.kafkaAvroSerializer = new 
KafkaAvroSerializer(schemaRegistryClient);
+   }
+
+   String schema = 
schemaRegistryClient.getLatestSchemaMetadata(topicName + "-value").getSchema();
+
+   if (jsonAvroConverter == null) {
+   jsonAvroConverter = new JsonAvroConverter();
+   }
+
+   //System.out.println("Schema fetched from Schema 
Registry for topic :" + topicName + " = " + schema);
+   GenericData.Record record = 
jsonAvroConverter.convertToGenericDataRecord(obj.toString().getBytes(), new 
Schema.Parser().parse(schema));
--- End diff --

Why can't we simply pass `obj` to the `kafkaAvroSerializer`?


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema

[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512660#comment-16512660
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195469740
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroDeserializationConfluentSchema.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroDecoder;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+import java.io.IOException;
+
+/**
+ * The deserialization schema for the Avro type.
+ */
+public class AvroDeserializationConfluentSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private Class avroType;
+   private final String schemaRegistryUrl;
+   private final int identityMapCapacity;
+   private KafkaAvroDecoder kafkaAvroDecoder;
+
+   private ObjectMapper mapper;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   public AvroDeserializationConfluentSchema(Class avroType, String 
schemaRegistyUrl) {
+   this(avroType, schemaRegistyUrl, 
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
+   }
+
+   public AvroDeserializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, int identityMapCapacity) {
+   this.avroType = avroType;
+   this.schemaRegistryUrl = schemaRegistryUrl;
+   this.identityMapCapacity = identityMapCapacity;
+   }
+
+   @Override
+   public T deserialize(byte[] message) throws IOException {
+   if (kafkaAvroDecoder == null) {
+   SchemaRegistryClient schemaRegistryClient = new 
CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
+   this.kafkaAvroDecoder = new 
KafkaAvroDecoder(schemaRegistryClient);
--- End diff --

Why do we use the `KafkaAvroDecoder` instead of the `KafkaAvroDeserializer`?


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512647#comment-16512647
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195452794
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
--- End diff --

Nowhere used


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512671#comment-16512671
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195476574
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/example/avro/User.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package example.avro;
+
+/**
+ * Autogenerated by Avro
+ *  DO NOT EDIT DIRECTLY 
+ */
+
+import org.apache.avro.message.BinaryMessageDecoder;
+import org.apache.avro.message.BinaryMessageEncoder;
+import org.apache.avro.message.SchemaStore;
+import org.apache.avro.specific.SpecificData;
+/**
+**/
+
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class User extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
--- End diff --

Can we automatically generate these classes by using Maven's 
`generate-sources` phase?


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512651#comment-16512651
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195464034
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-0.8_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   
+   org.apache.avro
+   avro
+   
--- End diff --

For what do we need this dependency?


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512663#comment-16512663
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195472489
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
+
+   private String topicName;
+
+   private SchemaRegistryClient schemaRegistryClient;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   private final Class avroType;
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, String topicName) {
+   this(avroType, schemaRegistryUrl, 
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, topicName);
+   }
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, int identityMapCapacity, String topicName) {
+   this.schemaRegistryUrl = schemaRegistryUrl;
+   this.identityMapCapacity = identityMapCapacity;
+   this.topicName = topicName;
+   this.avroType = avroType;
+   }
+
+   @Override
+   public byte[] serialize(T obj) {
+   byte[] serializedBytes = null;
+
+   try {
+   if (kafkaAvroSerializer == null) {
+   this.schemaRegistryClient = new 
CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
+   this.kafkaAvroSerializer = new 
KafkaAvroSerializer(schemaRegistryClient);
+   }
+
+   String schema = 
schemaRegistryClient.getLatestSchemaMetadata(topicName + "-value").getSchema();
+
+   if (jsonAvroConverter == null) {
+   jsonAvroConverter = new JsonAvroConverter();
+   }
+
+   //System.out.println("Schema fetched from Schema 
Registry for topic :" + topicName + " = " + schema);
+   GenericData.Record record = 
jsonAvroConverter.convertToGenericDataRecord(obj.toString().getBytes(), new 
Schema.Parser().parse(schema));
+
+   if (GenericData.get().validate(new 
Schema.Parser().parse(schema), record)) {
+   serializedBytes = 
kafkaAvroSerializer.serialize(topicName, record);
+
+   } else {
+   System.out.println("Error :Invalid message : 
Doesn't follow the avro schema : Message not published to the topic, message = 
" + record.toString());
+
+   }
+
+   } catch (Exception ex) {
+   ex.printStackTrace();
--- End diff --

No proper exception handling.


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> 

[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512665#comment-16512665
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195472438
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
+
+   private String topicName;
+
+   private SchemaRegistryClient schemaRegistryClient;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   private final Class avroType;
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, String topicName) {
+   this(avroType, schemaRegistryUrl, 
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, topicName);
+   }
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, int identityMapCapacity, String topicName) {
+   this.schemaRegistryUrl = schemaRegistryUrl;
+   this.identityMapCapacity = identityMapCapacity;
+   this.topicName = topicName;
+   this.avroType = avroType;
+   }
+
+   @Override
+   public byte[] serialize(T obj) {
+   byte[] serializedBytes = null;
+
+   try {
+   if (kafkaAvroSerializer == null) {
+   this.schemaRegistryClient = new 
CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
+   this.kafkaAvroSerializer = new 
KafkaAvroSerializer(schemaRegistryClient);
+   }
+
+   String schema = 
schemaRegistryClient.getLatestSchemaMetadata(topicName + "-value").getSchema();
+
+   if (jsonAvroConverter == null) {
+   jsonAvroConverter = new JsonAvroConverter();
+   }
+
+   //System.out.println("Schema fetched from Schema 
Registry for topic :" + topicName + " = " + schema);
+   GenericData.Record record = 
jsonAvroConverter.convertToGenericDataRecord(obj.toString().getBytes(), new 
Schema.Parser().parse(schema));
+
+   if (GenericData.get().validate(new 
Schema.Parser().parse(schema), record)) {
+   serializedBytes = 
kafkaAvroSerializer.serialize(topicName, record);
+
+   } else {
+   System.out.println("Error :Invalid message : 
Doesn't follow the avro schema : Message not published to the topic, message = 
" + record.toString());
--- End diff --

No println logging. Better to use proper loggers.


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type:

[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512657#comment-16512657
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195465338
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroDeserializationConfluentSchema.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroDecoder;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+import java.io.IOException;
+
+/**
+ * The deserialization schema for the Avro type.
+ */
+public class AvroDeserializationConfluentSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private Class avroType;
+   private final String schemaRegistryUrl;
+   private final int identityMapCapacity;
+   private KafkaAvroDecoder kafkaAvroDecoder;
+
+   private ObjectMapper mapper;
+
+   private JsonAvroConverter jsonAvroConverter;
--- End diff --

`transient`


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512653#comment-16512653
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195464456
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-0.8_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   
+   org.apache.avro
+   avro
+   
+   
+   io.confluent
+   kafka-avro-serializer
+   ${confluent.version}
+   
+   
+   
+   tech.allegro.schema.json2avro
+   converter
+   ${json2avro.version}
+   
--- End diff --

Why do we need this dependency?


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512655#comment-16512655
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195465088
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-0.8_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   
+   org.apache.avro
+   avro
+   
+   
+   io.confluent
+   kafka-avro-serializer
+   ${confluent.version}
+   
+   
+   
+   tech.allegro.schema.json2avro
+   converter
+   ${json2avro.version}
+   
+   
+
+   
+   
+   
+   build-jar
+   
+   false
+   
+   
+   
+   
+   
src/main/resources/${resource.dir}
+   true
+   
+   
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-shade-plugin
+   2.4.1
+   
+   
+   
+   
package
+   
+   
shade
+   
+   
+   

+   

+   
org.apache.flink:force-shading
+   
com.google.code.findbugs:jsr305
+   
org.slf4j:*
+   

+   

+   

+   

+   

  1   2   3   >