[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter
[ https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513429#comment-16513429 ] ASF GitHub Bot commented on FLINK-9187: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r195644101 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.util.AbstractID; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * /** + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway. + */ +@PublicEvolving +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + + public static final char JOB_NAME_SEPARATOR = '-'; + public static final String JOB_NAME_PREFIX = "flink" + JOB_NAME_SEPARATOR; + + private PushGateway pushGateway; + private final String jobName; + + public PrometheusPushGatewayReporter() { + String random = new AbstractID().toString(); + jobName = JOB_NAME_PREFIX + random; --- End diff -- At the _very least_ we should already be using TM IDs. > add prometheus pushgateway reporter > --- > > Key: FLINK-9187 > URL: https://issues.apache.org/jira/browse/FLINK-9187 > Project: Flink > Issue Type: New Feature > Components: Metrics >Affects Versions: 1.4.2 >Reporter: lamber-ken >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > make flink system can send metrics to prometheus via pushgateway. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r195644101 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.util.AbstractID; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * /** + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway. + */ +@PublicEvolving +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + + public static final char JOB_NAME_SEPARATOR = '-'; + public static final String JOB_NAME_PREFIX = "flink" + JOB_NAME_SEPARATOR; + + private PushGateway pushGateway; + private final String jobName; + + public PrometheusPushGatewayReporter() { + String random = new AbstractID().toString(); + jobName = JOB_NAME_PREFIX + random; --- End diff -- At the _very least_ we should already be using TM IDs. ---
[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter
[ https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513423#comment-16513423 ] ASF GitHub Bot commented on FLINK-9187: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r195643808 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.util.AbstractID; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * /** + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway. + */ +@PublicEvolving +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + + public static final char JOB_NAME_SEPARATOR = '-'; + public static final String JOB_NAME_PREFIX = "flink" + JOB_NAME_SEPARATOR; + + private PushGateway pushGateway; + private final String jobName; + + public PrometheusPushGatewayReporter() { + String random = new AbstractID().toString(); + jobName = JOB_NAME_PREFIX + random; --- End diff -- I just don't see a reason to rush this. There's a known issue we have to fix and the PR is not at risk of becoming outdated in the mean-time. If the argument is that "other people might start using it already" then we may just end up unnecessarily breaking their setup before the release. > add prometheus pushgateway reporter > --- > > Key: FLINK-9187 > URL: https://issues.apache.org/jira/browse/FLINK-9187 > Project: Flink > Issue Type: New Feature > Components: Metrics >Affects Versions: 1.4.2 >Reporter: lamber-ken >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > make flink system can send metrics to prometheus via pushgateway. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r195643808 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.util.AbstractID; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * /** + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway. + */ +@PublicEvolving +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + + public static final char JOB_NAME_SEPARATOR = '-'; + public static final String JOB_NAME_PREFIX = "flink" + JOB_NAME_SEPARATOR; + + private PushGateway pushGateway; + private final String jobName; + + public PrometheusPushGatewayReporter() { + String random = new AbstractID().toString(); + jobName = JOB_NAME_PREFIX + random; --- End diff -- I just don't see a reason to rush this. There's a known issue we have to fix and the PR is not at risk of becoming outdated in the mean-time. If the argument is that "other people might start using it already" then we may just end up unnecessarily breaking their setup before the release. ---
[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513390#comment-16513390 ] Shimin Yang commented on FLINK-9567: I modified the onContainerCompleted method and running for test. > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Priority: Critical > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > Flink lost the connection of container 24 which is on bd-r1hdp69 machine. > When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it > did not has the connection to TaskManager on container 24, so it just ignore > the close of TaskManger. > 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No > open TaskExecutor connection container_1528707394163_29461_02_24. > Ignoring close TaskExecutor connection. > However, bafore calling *closeTaskManagerConnection,* it already called > *requestYarnContainer* which lead to *numPendingContainerRequests variable > in* *YarnResourceManager* increased by 1. > As the excessive container return is determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot > return this container although it is not required. Meanwhile, the restart > logic has already allocated enough containers for Task Managers, Flink will > possess the extra container for a long time for nothing. > In the full log, the job ended with 7 containers while only 3 are running > TaskManagers. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reassigned FLINK-9455: - Assignee: Till Rohrmann (was: Sihua Zhou) > Make SlotManager aware of multi slot TaskManagers > - > > Key: FLINK-9455 > URL: https://issues.apache.org/jira/browse/FLINK-9455 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > The {{SlotManager}} responsible for managing all available slots of a Flink > cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot > request. The started {{TaskManager}} can be started with multiple slots > configured but currently, the {{SlotManager}} thinks that it will be started > with a single slot. As a consequence, it might issue multiple requests to > start new TaskManagers even though a single one would be sufficient to > fulfill all pending slot requests. > In order to avoid requesting unnecessary resources which are freed after the > idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a > {{TaskManager}} is started with. That way the SlotManager only needs to > request a new {{TaskManager}} if all of the previously started slots > (potentially not yet registered and, thus, future slots) are being assigned > to slot requests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513336#comment-16513336 ] Sihua Zhou commented on FLINK-9455: --- Hi [~till.rohrmann] the more I thought about this , the more I found it's tricky, since you maybe the best one that familiar with this related part. I don't want mess this up, I would leave this issue to you. > Make SlotManager aware of multi slot TaskManagers > - > > Key: FLINK-9455 > URL: https://issues.apache.org/jira/browse/FLINK-9455 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > The {{SlotManager}} responsible for managing all available slots of a Flink > cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot > request. The started {{TaskManager}} can be started with multiple slots > configured but currently, the {{SlotManager}} thinks that it will be started > with a single slot. As a consequence, it might issue multiple requests to > start new TaskManagers even though a single one would be sufficient to > fulfill all pending slot requests. > In order to avoid requesting unnecessary resources which are freed after the > idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a > {{TaskManager}} is started with. That way the SlotManager only needs to > request a new {{TaskManager}} if all of the previously started slots > (potentially not yet registered and, thus, future slots) are being assigned > to slot requests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9168) Pulsar Sink Connector
[ https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513320#comment-16513320 ] ASF GitHub Bot commented on FLINK-9168: --- Github user XiaoZYang commented on the issue: https://github.com/apache/flink/pull/5845 @zentol got it! So what you mean is that @sijie and I will keep updating commits until this work is done and you guys will merge this PR to a new branch. Later PR will be against the new branch. Did I misunderstand you ? > Pulsar Sink Connector > - > > Key: FLINK-9168 > URL: https://issues.apache.org/jira/browse/FLINK-9168 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Zongyang Xiao >Priority: Minor > Fix For: 1.6.0 > > > Flink does not provide a sink connector for Pulsar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector
Github user XiaoZYang commented on the issue: https://github.com/apache/flink/pull/5845 @zentol got it! So what you mean is that @sijie and I will keep updating commits until this work is done and you guys will merge this PR to a new branch. Later PR will be against the new branch. Did I misunderstand you ? ---
[jira] [Commented] (FLINK-9563) Migrate integration tests for CEP
[ https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513319#comment-16513319 ] ASF GitHub Bot commented on FLINK-9563: --- Github user deepaks4077 commented on the issue: https://github.com/apache/flink/pull/6170 @zentol , please take a look when you get the chance. I think testSimpleAfterMatchSkip() is failing [here](https://api.travis-ci.org/v3/job/392551505/log.txt) because the program arrived at the assert statement before the result stream was fully added to the sink. This test was passing in the travis ci build of my fork. Maybe we can add a countdownlatch to control for the size of the array list VALUES? A rich sink function would then countdown upon closing. What would you suggest? > Migrate integration tests for CEP > - > > Key: FLINK-9563 > URL: https://issues.apache.org/jira/browse/FLINK-9563 > Project: Flink > Issue Type: Sub-task >Reporter: Deepak Sharma >Assignee: Deepak Sharma >Priority: Minor > > Covers all integration tests under > apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...
Github user deepaks4077 commented on the issue: https://github.com/apache/flink/pull/6170 @zentol , please take a look when you get the chance. I think testSimpleAfterMatchSkip() is failing [here](https://api.travis-ci.org/v3/job/392551505/log.txt) because the program arrived at the assert statement before the result stream was fully added to the sink. This test was passing in the travis ci build of my fork. Maybe we can add a countdownlatch to control for the size of the array list VALUES? A rich sink function would then countdown upon closing. What would you suggest? ---
[jira] [Updated] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-9567: --- Description: After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not release task manager containers in some specific case. In the worst case, I had a job configured to 5 task managers, but possess more than 100 containers in the end. Although the task didn't failed, but it affect other jobs in the Yarn Cluster. In the first log I posted, the container with id 24 is the reason why Yarn did not release resources. As the container was killed before restart, but it has not received the callback of *onContainerComplete* in *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. After restart, as we can see in line 347 of FlinkYarnProblem log, 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has failed, address is now gated for [50] ms. Reason: [Disassociated] Flink lost the connection of container 24 which is on bd-r1hdp69 machine. When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it did not has the connection to TaskManager on container 24, so it just ignore the close of TaskManger. 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No open TaskExecutor connection container_1528707394163_29461_02_24. Ignoring close TaskExecutor connection. However, bafore calling *closeTaskManagerConnection,* it already called *requestYarnContainer* which lead to *numPendingContainerRequests variable in* *YarnResourceManager* increased by 1. As the excessive container return is determined by the *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot return this container although it is not required. Meanwhile, the restart logic has already allocated enough containers for Task Managers, Flink will possess the extra container for a long time for nothing. In the full log, the job ended with 7 containers while only 3 are running TaskManagers. ps: Another strange thing I found is that when sometimes request for a yarn container, it will return much more than requested. Is it a normal scenario for AMRMAsyncClient? was: After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not release task manager containers in some specific case. In the worst case, I had a job configured to 5 task managers, but possess more than 100 containers in the end. Although the task didn't failed, but it affect other jobs in the Yarn Cluster. In the first log I posted, the container with id 24 is the reason why Yarn did not release resources. As the container was killed before restart, but it has not received the callback of *onContainerComplete* in *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. After restart, as we can see in line 347 of FlinkYarnProblem log, 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has failed, address is now gated for [50] ms. Reason: [Disassociated] Flink lost the connection of container 24 which is on bd-r1hdp69 machine. When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it did not has the connection to TaskManager on container 24, so it just ignore the close of TaskManger. 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No open TaskExecutor connection container_1528707394163_29461_02_24. Ignoring close TaskExecutor connection. However, bafore calling *closeTaskManagerConnection,* it already called *requestYarnContainer* which lead to *numPendingContainerRequests variable in* *YarnResourceManager* increased by 1. As the excessive container return is determined by the *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot return this container although it is not required. Meanwhile, the restart logic has already allocated enough containers for Task Managers, Flink will possess the extra container for a long time for nothing. ps: Another strange thing I found is that when sometimes request for a yarn container, it will return much more than requested. Is it a normal scenario for AMRMAsyncClient? > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Priority: Critical > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager co
[jira] [Updated] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-9567: --- Description: After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not release task manager containers in some specific case. In the worst case, I had a job configured to 5 task managers, but possess more than 100 containers in the end. Although the task didn't failed, but it affect other jobs in the Yarn Cluster. In the first log I posted, the container with id 24 is the reason why Yarn did not release resources. As the container was killed before restart, but it has not received the callback of *onContainerComplete* in *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. After restart, as we can see in line 347 of FlinkYarnProblem log, 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has failed, address is now gated for [50] ms. Reason: [Disassociated] Flink lost the connection of container 24 which is on bd-r1hdp69 machine. When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it did not has the connection to TaskManager on container 24, so it just ignore the close of TaskManger. 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No open TaskExecutor connection container_1528707394163_29461_02_24. Ignoring close TaskExecutor connection. However, bafore calling *closeTaskManagerConnection,* it already called *requestYarnContainer* which lead to *numPendingContainerRequests variable in* *YarnResourceManager* increased by 1. As the excessive container return is determined by the *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot return this container although it is not required. Meanwhile, the restart logic has already allocated enough containers for Task Managers, Flink will possess the extra container for a long time for nothing. ps: Another strange thing I found is that when sometimes request for a yarn container, it will return much more than requested. Is it a normal scenario for AMRMAsyncClient? was: After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not release task manager containers in some specific case. In the worst case, I had a job configured to 5 task managers, but possess more than 100 containers in the end. Although the task didn't failed, but it affect other jobs in the Yarn Cluster. In the first log I posted, the container with id 24 is the reason why Yarn did not release resources. As the container was killed before restart, but it has not received the callback of *onContainerComplete* in *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. After restart, as we can see in line 347 of FlinkYarnProblem log, 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has failed, address is now gated for [50] ms. Reason: [Disassociated] Flink lost the connection of container 24 which is on bd-r1hdp69 machine. When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it did not has the connection to TaskManager on container 24, so it just ignore the close of TaskManger. 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No open TaskExecutor connection container_1528707394163_29461_02_24. Ignoring close TaskExecutor connection. However, bafore calling *closeTaskManagerConnection,* it already called *requestYarnContainer* which lead to *numPendingContainerRequests variable in* *YarnResourceManager* increased by 1.** As the excessive container return is determined by the *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot return this container although it is not required*.* Meanwhile, the restart logic has already allocated enough containers for Task Managers, Flink will possess the extra container for a long time for nothing. ps: Another strange thing I found is that when sometimes request for a yarn container, it will return much more than requested. Is it a normal scenario for AMRMAsyncClient? > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Priority: Critical > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task
[jira] [Updated] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-9567: --- Description: After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not release task manager containers in some specific case. In the worst case, I had a job configured to 5 task managers, but possess more than 100 containers in the end. Although the task didn't failed, but it affect other jobs in the Yarn Cluster. In the first log I posted, the container with id 24 is the reason why Yarn did not release resources. As the container was killed before restart, but it has not received the callback of *onContainerComplete* in *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. After restart, as we can see in line 347 of FlinkYarnProblem log, 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has failed, address is now gated for [50] ms. Reason: [Disassociated] Flink lost the connection of container 24 which is on bd-r1hdp69 machine. When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it did not has the connection to TaskManager on container 24, so it just ignore the close of TaskManger. 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No open TaskExecutor connection container_1528707394163_29461_02_24. Ignoring close TaskExecutor connection. However, bafore calling *closeTaskManagerConnection,* it already called *requestYarnContainer* which lead to *numPendingContainerRequests variable in* *YarnResourceManager* increased by 1.** As the excessive container return is determined by the *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot return this container although it is not required*.* Meanwhile, the restart logic has already allocated enough containers for Task Managers, Flink will possess the extra container for a long time for nothing. ps: Another strange thing I found is that when sometimes request for a yarn container, it will return much more than requested. Is it a normal scenario for AMRMAsyncClient? was: After restart the Job Manager in Yarn Cluster mode, Flink does not release task manager containers in some specific case. In the first log I posted, the container with id 24 is the reason why Yarn did not release resources. Although the Task Manager in the container with id 24 was released before restart. But in line 347, 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has failed, address is now gated for [50] ms. Reason: [Disassociated] this problem caused flink to request for one more container more than need. As the excessive container return id determined by the *numPendingContainerRequests* variable in *YarnResourceManager*, I think it's the *onContainersCompleted* in *YarnResourceManager* called the method *requestYarnContainer* which leads to the increase of *numPendingContainerRequests.* However, the restart logic has already allocated enough containers for Task Managers, Flink will possess the extra container for a long time for nothing. In the worst case, I had a job configured to 5 task managers, but possess more than 100 containers in the end. ps: Another strange thing I found is that when sometimes request for a yarn container, it will return much more than requested. Is it a normal scenario for AMRMAsyncClient? > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Priority: Major > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] m
[jira] [Updated] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-9567: --- Priority: Critical (was: Major) > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Priority: Critical > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > Flink lost the connection of container 24 which is on bd-r1hdp69 machine. > When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it > did not has the connection to TaskManager on container 24, so it just ignore > the close of TaskManger. > 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No > open TaskExecutor connection container_1528707394163_29461_02_24. > Ignoring close TaskExecutor connection. > However, bafore calling *closeTaskManagerConnection,* it already called > *requestYarnContainer* which lead to *numPendingContainerRequests variable > in* *YarnResourceManager* increased by 1.** > As the excessive container return is determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot > return this container although it is not required*.* Meanwhile, the restart > logic has already allocated enough containers for Task Managers, Flink will > possess the extra container for a long time for nothing. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-9567: --- Attachment: fulllog.txt > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Priority: Major > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, Flink does not release > task manager containers in some specific case. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. Although the Task Manager in the container with id > 24 was released before restart. > But in line 347, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > this problem caused flink to request for one more container more than need. > As the excessive container return id determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, I think it's > the *onContainersCompleted* in *YarnResourceManager* called the method > *requestYarnContainer* which leads to the increase of > *numPendingContainerRequests.* However, the restart logic has already > allocated enough containers for Task Managers, Flink will possess the extra > container for a long time for nothing. In the worst case, I had a job > configured to 5 task managers, but possess more than 100 containers in the > end. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-9567: --- Attachment: FlinkYarnProblem > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Priority: Major > Attachments: FlinkYarnProblem > > > After restart the Job Manager in Yarn Cluster mode, Flink does not release > task manager containers in some specific case. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. Although the Task Manager in the container with id > 24 was released before restart. > But in line 347, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > this problem caused flink to request for one more container more than need. > As the excessive container return id determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, I think it's > the *onContainersCompleted* in *YarnResourceManager* called the method > *requestYarnContainer* which leads to the increase of > *numPendingContainerRequests.* However, the restart logic has already > allocated enough containers for Task Managers, Flink will possess the extra > container for a long time for nothing. In the worst case, I had a job > configured to 5 task managers, but possess more than 100 containers in the > end. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-9567: --- Attachment: (was: jobmanager.log) > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Priority: Major > Attachments: FlinkYarnProblem > > > After restart the Job Manager in Yarn Cluster mode, Flink does not release > task manager containers in some specific case. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. Although the Task Manager in the container with id > 24 was released before restart. > But in line 347, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > this problem caused flink to request for one more container more than need. > As the excessive container return id determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, I think it's > the *onContainersCompleted* in *YarnResourceManager* called the method > *requestYarnContainer* which leads to the increase of > *numPendingContainerRequests.* However, the restart logic has already > allocated enough containers for Task Managers, Flink will possess the extra > container for a long time for nothing. In the worst case, I had a job > configured to 5 task managers, but possess more than 100 containers in the > end. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-9567: --- Description: After restart the Job Manager in Yarn Cluster mode, Flink does not release task manager containers in some specific case. In the first log I posted, the container with id 24 is the reason why Yarn did not release resources. Although the Task Manager in the container with id 24 was released before restart. But in line 347, 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has failed, address is now gated for [50] ms. Reason: [Disassociated] this problem caused flink to request for one more container more than need. As the excessive container return id determined by the *numPendingContainerRequests* variable in *YarnResourceManager*, I think it's the *onContainersCompleted* in *YarnResourceManager* called the method *requestYarnContainer* which leads to the increase of *numPendingContainerRequests.* However, the restart logic has already allocated enough containers for Task Managers, Flink will possess the extra container for a long time for nothing. In the worst case, I had a job configured to 5 task managers, but possess more than 100 containers in the end. ps: Another strange thing I found is that when sometimes request for a yarn container, it will return much more than requested. Is it a normal scenario for AMRMAsyncClient? was: After restart the Job Manager in Yarn Cluster mode, Flink does not release task manager containers in some specific case. According to my observation, the reason is the instance variable *numPendingContainerRequests* in *YarnResourceManager* class does not decrease since it has not received the containers. And after restart of job manager, it will make increase the *numPendingContainerRequests* by the number of task executors. Since the callback function *onContainersAllocated* will return the excessive container immediately only if the *numPendingContainerRequests* <= 0, so the number of container grows bigger and bigger while only a few are acting as task manager. I think it is important to clear the *numPendingContainerRequests* variable after restart the Job Manager, but not very clear at how to do that. There's no other way to decrease the *numPendingContainerRequests* except the *onContainersAllocated*. Is it fine to add a method to operate on the *numPendingContainerRequests* variable? And meanwhile, there's no handle of YarnResourceManager in the *ExecutionGraph* restart logic. ps: Another strange thing I found is that when sometimes request for a yarn container, it will return much more than requested. Is it a normal scenario for AMRMAsyncClient? > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Priority: Major > Attachments: jobmanager.log > > > After restart the Job Manager in Yarn Cluster mode, Flink does not release > task manager containers in some specific case. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. Although the Task Manager in the container with id > 24 was released before restart. > But in line 347, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > this problem caused flink to request for one more container more than need. > As the excessive container return id determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, I think it's > the *onContainersCompleted* in *YarnResourceManager* called the method > *requestYarnContainer* which leads to the increase of > *numPendingContainerRequests.* However, the restart logic has already > allocated enough containers for Task Managers, Flink will possess the extra > container for a long time for nothing. In the worst case, I had a job > configured to 5 task managers, but possess more than 100 containers in the > end. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9573) Check for leadership with leader session id
[ https://issues.apache.org/jira/browse/FLINK-9573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513210#comment-16513210 ] ASF GitHub Bot commented on FLINK-9573: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6154#discussion_r195613671 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java --- @@ -356,8 +357,8 @@ public void confirmLeaderSessionID(UUID leaderSessionID) { } @Override - public boolean hasLeadership() { - return isLeader; + public boolean hasLeadership(@Nonnull UUID leaderSessionId) { + return isLeader && leaderSessionId.equals(currentLeaderSessionId); } --- End diff -- Thanks @tillrohrmann Makes sense better to me now. Yea, I did not see there has a volatile in there before. Thanks again. > Check for leadership with leader session id > --- > > Key: FLINK-9573 > URL: https://issues.apache.org/jira/browse/FLINK-9573 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > In order to check whether a {{LeaderContender}} is still the leader, it is > not sufficient to simply provide a {{LeaderElectionService#hasLeadership()}}. > Instead, we should extend this method to also take the leader session id as a > parameter to distinguish between different calls from the same leader > contender with different leader session ids. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6154: [FLINK-9573] Extend LeaderElectionService#hasLeade...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6154#discussion_r195613671 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java --- @@ -356,8 +357,8 @@ public void confirmLeaderSessionID(UUID leaderSessionID) { } @Override - public boolean hasLeadership() { - return isLeader; + public boolean hasLeadership(@Nonnull UUID leaderSessionId) { + return isLeader && leaderSessionId.equals(currentLeaderSessionId); } --- End diff -- Thanks @tillrohrmann Makes sense better to me now. Yea, I did not see there has a volatile in there before. Thanks again. ---
[jira] [Closed] (FLINK-9590) HistogramDump should be immutable
[ https://issues.apache.org/jira/browse/FLINK-9590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9590. --- Resolution: Fixed master: 24266fd63b72c76e77ed86e46392f7427e4d1f8b > HistogramDump should be immutable > - > > Key: FLINK-9590 > URL: https://issues.apache.org/jira/browse/FLINK-9590 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.6.0 > > > The {{HistogramDump}} represents the contents of a histogram at one point in > time, and should thus not be mutable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9591) Remove remnants of distributed-cache logic
[ https://issues.apache.org/jira/browse/FLINK-9591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9591. --- Resolution: Fixed master: 0dc0e36f204401f14e078d24348421cd4140577d > Remove remnants of distributed-cache logic > -- > > Key: FLINK-9591 > URL: https://issues.apache.org/jira/browse/FLINK-9591 > Project: Flink > Issue Type: Bug > Components: Python API >Affects Versions: 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > There are still some remnants in the python API for the old distributed cache > logic, where we uploaded files first to a DFS before registering them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9589) PythonOperationInfo should be immutable
[ https://issues.apache.org/jira/browse/FLINK-9589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9589. --- Resolution: Fixed master: 2fc6499496296cf2aa2408e4e15d684496a169a2 > PythonOperationInfo should be immutable > --- > > Key: FLINK-9589 > URL: https://issues.apache.org/jira/browse/FLINK-9589 > Project: Flink > Issue Type: Bug > Components: Python API >Affects Versions: 1.2.1, 1.3.3, 1.5.0, 1.4.2, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > The {{PythonOperationInfo}} is a simple represantation of a dataset operation > defined by the python plan. Thus the entire object should be immutable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8744) Add annotations for documenting common/advanced options
[ https://issues.apache.org/jira/browse/FLINK-8744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8744. --- Resolution: Fixed master: eaff4da15bdd7528dcc0d8a37fd59642cee53850 > Add annotations for documenting common/advanced options > --- > > Key: FLINK-8744 > URL: https://issues.apache.org/jira/browse/FLINK-8744 > Project: Flink > Issue Type: New Feature > Components: Configuration, Documentation >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.6.0 > > > The {{ConfigDocsGenerator}} only generates [the full configuration > reference|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#full-reference]. > The > [common|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#common-options] > and > [advanced|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#advanced-options] > sections are still manually defined in {{config.md}} and thus prone to being > outdated / out-of-sync with the full reference. > I propose adding {{@Common}}/{{@Advanced}} annotations based on which we > could generate these sections as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9530) Task numRecords metrics broken for chains
[ https://issues.apache.org/jira/browse/FLINK-9530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513144#comment-16513144 ] ASF GitHub Bot commented on FLINK-9530: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6126 > Task numRecords metrics broken for chains > - > > Key: FLINK-9530 > URL: https://issues.apache.org/jira/browse/FLINK-9530 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > The {{numRecordsIn/Out}} metrics for tasks is currently broken. We are > wrongly adding up the numRecordsIn/Out metrics for all operators in the > chain, instead of just the head/tail operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8744) Add annotations for documenting common/advanced options
[ https://issues.apache.org/jira/browse/FLINK-8744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513146#comment-16513146 ] ASF GitHub Bot commented on FLINK-8744: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5843 > Add annotations for documenting common/advanced options > --- > > Key: FLINK-8744 > URL: https://issues.apache.org/jira/browse/FLINK-8744 > Project: Flink > Issue Type: New Feature > Components: Configuration, Documentation >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.6.0 > > > The {{ConfigDocsGenerator}} only generates [the full configuration > reference|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#full-reference]. > The > [common|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#common-options] > and > [advanced|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#advanced-options] > sections are still manually defined in {{config.md}} and thus prone to being > outdated / out-of-sync with the full reference. > I propose adding {{@Common}}/{{@Advanced}} annotations based on which we > could generate these sections as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9257) End-to-end tests prints "All tests PASS" even if individual test-script returns non-zero exit code
[ https://issues.apache.org/jira/browse/FLINK-9257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9257. --- Resolution: Fixed Fix Version/s: 1.5.1 master:45ac85e2c4ec66ff02e4277c1781247710252a26 1.5: 0796ac7e212f8d6197e5db95b1a85417767b247c > End-to-end tests prints "All tests PASS" even if individual test-script > returns non-zero exit code > -- > > Key: FLINK-9257 > URL: https://issues.apache.org/jira/browse/FLINK-9257 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Critical > Fix For: 1.6.0, 1.5.1 > > > In some cases the test-suite exits with non-zero exit code but still prints > "All tests PASS" to stdout. This happens because how the test runner works, > which is roughly as follows > # Either run-nightly-tests.sh or run-precommit-tests.sh executes a suite of > tests consisting of one multiple bash scripts. > # As soon as one of those bash scripts exists with non-zero exit code, the > tests won't continue to run and the test-suite will also exit with non-zero > exit code. > # *During the cleanup hook (trap cleanup EXIT in common.sh) it will be > checked whether there are non-empty out files or log files with certain > exceptions. If a tests fails with non-zero exit code, but does not have any > exceptions or .out files, this will still print "All tests PASS" to stdout, > even though they don't* > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9257) End-to-end tests prints "All tests PASS" even if individual test-script returns non-zero exit code
[ https://issues.apache.org/jira/browse/FLINK-9257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513145#comment-16513145 ] ASF GitHub Bot commented on FLINK-9257: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6053 > End-to-end tests prints "All tests PASS" even if individual test-script > returns non-zero exit code > -- > > Key: FLINK-9257 > URL: https://issues.apache.org/jira/browse/FLINK-9257 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Critical > Fix For: 1.6.0, 1.5.1 > > > In some cases the test-suite exits with non-zero exit code but still prints > "All tests PASS" to stdout. This happens because how the test runner works, > which is roughly as follows > # Either run-nightly-tests.sh or run-precommit-tests.sh executes a suite of > tests consisting of one multiple bash scripts. > # As soon as one of those bash scripts exists with non-zero exit code, the > tests won't continue to run and the test-suite will also exit with non-zero > exit code. > # *During the cleanup hook (trap cleanup EXIT in common.sh) it will be > checked whether there are non-empty out files or log files with certain > exceptions. If a tests fails with non-zero exit code, but does not have any > exceptions or .out files, this will still print "All tests PASS" to stdout, > even though they don't* > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6126: [FLINK-9530][metrics] Fix numRecords task metric f...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6126 ---
[GitHub] flink pull request #5843: [FLINK-8744][docs] Generate "Common Option" sectio...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5843 ---
[GitHub] flink pull request #6053: [FLINK-9257][E2E Tests] Fix wrong "All tests pass"...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6053 ---
[jira] [Closed] (FLINK-9530) Task numRecords metrics broken for chains
[ https://issues.apache.org/jira/browse/FLINK-9530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9530. --- Resolution: Fixed master: 03d77b862b845fc2b2836fc35fdbc8793d5064b7 1.5: 246ef1d291ef8ba6907c368e1cee42d20a5e617a > Task numRecords metrics broken for chains > - > > Key: FLINK-9530 > URL: https://issues.apache.org/jira/browse/FLINK-9530 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > The {{numRecordsIn/Out}} metrics for tasks is currently broken. We are > wrongly adding up the numRecordsIn/Out metrics for all operators in the > chain, instead of just the head/tail operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9563) Migrate integration tests for CEP
[ https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513016#comment-16513016 ] ASF GitHub Bot commented on FLINK-9563: --- Github user deepaks4077 commented on the issue: https://github.com/apache/flink/pull/6170 @zentol, thanks, will do! > Migrate integration tests for CEP > - > > Key: FLINK-9563 > URL: https://issues.apache.org/jira/browse/FLINK-9563 > Project: Flink > Issue Type: Sub-task >Reporter: Deepak Sharma >Assignee: Deepak Sharma >Priority: Minor > > Covers all integration tests under > apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...
Github user deepaks4077 commented on the issue: https://github.com/apache/flink/pull/6170 @zentol, thanks, will do! ---
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513012#comment-16513012 ] ASF GitHub Bot commented on FLINK-8983: --- Github user medcv commented on the issue: https://github.com/apache/flink/pull/6083 @tillrohrmann Thanks for the review! I will go through them and will make the changes shortly. > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6083: [FLINK-8983] End-to-end test: Confluent schema registry
Github user medcv commented on the issue: https://github.com/apache/flink/pull/6083 @tillrohrmann Thanks for the review! I will go through them and will make the changes shortly. ---
[jira] [Commented] (FLINK-9563) Migrate integration tests for CEP
[ https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512985#comment-16512985 ] ASF GitHub Bot commented on FLINK-9563: --- GitHub user deepaks4077 opened a pull request: https://github.com/apache/flink/pull/6170 [FLINK-9563]: Using a custom sink function for tests in CEPITCase instead of writing to disk ## What is the purpose of the change This change modifies the CEPITCase integration test to use a custom sink function to collect and compare test results, instead of writing them to a file. It does not add/remove any constituent tests. ## Brief change log - Removed Before and After junit annotations - Added a custom sink function with a static arraylist to collect and compare test results ## Verifying this change This change is already covered by existing tests, such as CEPITCase, which is the end to end test of the CEP API. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/deepaks4077/flink FLINK-9563 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6170.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6170 commit 8fc629a557af4a56ab7638cc5eb519e163267cdc Author: Deepak Sharnma Date: 2018-06-13T02:41:27Z [FLINK-9563]: Using a custom sink function for tests in CEPITCase > Migrate integration tests for CEP > - > > Key: FLINK-9563 > URL: https://issues.apache.org/jira/browse/FLINK-9563 > Project: Flink > Issue Type: Sub-task >Reporter: Deepak Sharma >Assignee: Deepak Sharma >Priority: Minor > > Covers all integration tests under > apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...
GitHub user deepaks4077 opened a pull request: https://github.com/apache/flink/pull/6170 [FLINK-9563]: Using a custom sink function for tests in CEPITCase instead of writing to disk ## What is the purpose of the change This change modifies the CEPITCase integration test to use a custom sink function to collect and compare test results, instead of writing them to a file. It does not add/remove any constituent tests. ## Brief change log - Removed Before and After junit annotations - Added a custom sink function with a static arraylist to collect and compare test results ## Verifying this change This change is already covered by existing tests, such as CEPITCase, which is the end to end test of the CEP API. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/deepaks4077/flink FLINK-9563 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6170.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6170 commit 8fc629a557af4a56ab7638cc5eb519e163267cdc Author: Deepak Sharnma Date: 2018-06-13T02:41:27Z [FLINK-9563]: Using a custom sink function for tests in CEPITCase ---
[jira] [Updated] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rinat Sharipov updated FLINK-9592: -- Description: Hi mates, I got a proposal about functionality of BucketingSink. During implementation of one of our tasks we got the following need - create a meta-file, with the path and additional information about the file, created by BucketingSink, when it’s been moved into final place. Unfortunately such behaviour is currently not available for us. We’ve implemented our own Sink, that provides an opportunity to register notifiers, that will be called, when file state is changing, but current API doesn’t allow us to add such behaviour using inheritance ... It seems, that such functionality could be useful, and could be a part of BucketingSink API What do you think, should I make a PR ? Sincerely yours, *Rinat Sharipov* Software Engineer at 1DMP CORE Team email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] mobile: +7 (925) 416-37-26 Clever{color:#4f8f00}DATA{color} make your data clever Hi, I see that could be a useful feature. What exactly now is preventing you from inheriting from BucketingSink? Maybe it would be just enough to make the BucketingSink easier extendable. One thing now that could collide with such feature is that Kostas is now working on larger BucketingSink rework/refactor. Piotrek Hi guys, thx for your reply. The following code info is actual for *release-1.5.0 tag, org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* For now, BucketingSink has the following lifecycle of files When moving files from opened to pending state: # on each item (*method* *invoke:434* *line*), we check that suitable bucket exist, and contain opened file, in case, when opened file doesn’t exist, we create one, and write item to it # on each item (*method* *invoke:434* *line*), we check that suitable opened file doesn’t exceed the limits, and if limits are exceeded, we close it and move into pending state using *closeCurrentPartFile:568 line - private method* # on each timer request (*onProcessingTime:482 line*), we check, if items haven't been added to the opened file longer, than specified period of time, we close it, using the same private method *closeCurrentPartFile:588 line* So, the only way, that we have, is to call our hook from *closeCurrentPartFile*, that is private, so we copy-pasted the current impl and injected our logic there Files are moving from pending state into final, during checkpointing lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and contains a lot of logic, including discovery of files in pending states, synchronization of state access and it’s modification, etc … So we couldn’t override it, or call super method and add some logic, because when current impl changes the state of files, it removes them from state, and we don’t have any opportunity to know, for which files state have been changed. To solve such problem, we've created the following interface /** * The \{@code FileStateChangeCallback}is used to perform any additional operations, when {@link BucketingSink} * moves file from one state to another. For more information about state management of \{@code BucketingSink}, look * through it's official documentation. */ public interface FileStateChangeCallback extends Serializable \{ /** * Used to perform any additional operations, related with moving of file into next state. * * @param fs provides access for working with file system * @param path path to the file, moved into next state * * @throws IOException if something went wrong, while performing any operations with file system */ void call(FileSystem fs, Path path) throws IOException; } And have added an ability to register this callbacks in BucketingSink impl in the following manner public BucketingSink registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...} public BucketingSink registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks) \{...} I’m ready to discuss the best ways, how such hooks could be implemented in the core impl or any other improvements, that will help us to add such functionality into our extension, using public api, instead of copy-pasting the source code. Thx for your help, mates =) [*See More* from Piotr Nowojski|x-redundant-cluster-toggle://0] Sincerely yours, *Rinat Sharipov* Software Engineer at 1DMP CORE Team email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] mobile: +7 (925) 416-37-26 Clever{color:#4f8f00}DATA{color} make your data clever ___
[jira] [Updated] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mikhail Pryakhin updated FLINK-9592: Description: Hi mates, I got a proposal about functionality of BucketingSink. During implementation of one of our tasks we got the following need - create a meta-file, with the path and additional information about the file, created by BucketingSink, when it’s been moved into final place. Unfortunately such behaviour is currently not available for us. We’ve implemented our own Sink, that provides an opportunity to register notifiers, that will be called, when file state is changing, but current API doesn’t allow us to add such behaviour using inheritance ... It seems, that such functionality could be useful, and could be a part of BucketingSink API What do you sink, should I make a PR ? Sincerely yours, *Rinat Sharipov* Software Engineer at 1DMP CORE Team email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] mobile: +7 (925) 416-37-26 Clever{color:#4f8f00}DATA{color} make your data clever Hi, I see that could be a useful feature. What exactly now is preventing you from inheriting from BucketingSink? Maybe it would be just enough to make the BucketingSink easier extendable. One thing now that could collide with such feature is that Kostas is now working on larger BucketingSink rework/refactor. Piotrek Hi guys, thx for your reply. The following code info is actual for *release-1.5.0 tag, org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* For now, BucketingSink has the following lifecycle of files When moving files from opened to pending state: # on each item (*method* *invoke:434* *line*), we check that suitable bucket exist, and contain opened file, in case, when opened file doesn’t exist, we create one, and write item to it # on each item (*method* *invoke:434* *line*), we check that suitable opened file doesn’t exceed the limits, and if limits are exceeded, we close it and move into pending state using *closeCurrentPartFile:568 line - private method* # on each timer request (*onProcessingTime:482 line*), we check, if items haven't been added to the opened file longer, than specified period of time, we close it, using the same private method *closeCurrentPartFile:588 line* So, the only way, that we have, is to call our hook from *closeCurrentPartFile*, that is private, so we copy-pasted the current impl and injected our logic there Files are moving from pending state into final, during checkpointing lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and contains a lot of logic, including discovery of files in pending states, synchronization of state access and it’s modification, etc … So we couldn’t override it, or call super method and add some logic, because when current impl changes the state of files, it removes them from state, and we don’t have any opportunity to know, for which files state have been changed. To solve such problem, we've created the following interface /** * The \{@code FileStateChangeCallback}is used to perform any additional operations, when {@link BucketingSink} * moves file from one state to another. For more information about state management of \{@code BucketingSink}, look * through it's official documentation. */ public interface FileStateChangeCallback extends Serializable \{ /** * Used to perform any additional operations, related with moving of file into next state. * * @param fs provides access for working with file system * @param path path to the file, moved into next state * * @throws IOException if something went wrong, while performing any operations with file system */ void call(FileSystem fs, Path path) throws IOException; } And have added an ability to register this callbacks in BucketingSink impl in the following manner public BucketingSink registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...} public BucketingSink registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks) \{...} I’m ready to discuss the best ways, how such hooks could be implemented in the core impl or any other improvements, that will help us to add such functionality into our extension, using public api, instead of copy-pasting the source code. Thx for your help, mates =) [*See More* from Piotr Nowojski|x-redundant-cluster-toggle://0] Sincerely yours, *Rinat Sharipov* Software Engineer at 1DMP CORE Team email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] mobile: +7 (925) 416-37-26 Clever{color:#4f8f00}DATA{color} make your data clever
[jira] [Created] (FLINK-9592) Notify on moving into pending/ final state
Rinat Sharipov created FLINK-9592: - Summary: Notify on moving into pending/ final state Key: FLINK-9592 URL: https://issues.apache.org/jira/browse/FLINK-9592 Project: Flink Issue Type: New Feature Components: filesystem-connector Reporter: Rinat Sharipov Hi mates, I got a proposal about functionality of BucketingSink. During implementation of one of our tasks we got the following need - create a meta-file, with the path and additional information about the file, created by BucketingSink, when it’s been moved into final place. Unfortunately such behaviour is currently not available for us. We’ve implemented our own Sink, that provides an opportunity to register notifiers, that will be called, when file state is changing, but current API doesn’t allow us to add such behaviour using inheritance ... It seems, that such functionality could be useful, and could be a part of BucketingSink API What do you sink, should I make a PR ? Sincerely yours, *Rinat Sharipov* Software Engineer at 1DMP CORE Team email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] mobile: +7 (925) 416-37-26 Clever{color:#4f8f00}DATA{color} make your data clever Hi, I see that could be a useful feature. What exactly now is preventing you from inheriting from BucketingSink? Maybe it would be just enough to make the BucketingSink easier extendable. One thing now that could collide with such feature is that Kostas is now working on larger BucketingSink rework/refactor. Piotrek Hi guys, thx for your reply. The following code info is actual for *release-1.5.0 tag, org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* For now, BucketingSink has the following lifecycle of files When moving files from opened to pending state: # on each item (*method* *invoke:434* *line*), we check that suitable bucket exist, and contain opened file, in case, when opened file doesn’t exist, we create one, and write item to it # on each item (*method* *invoke:434* *line*), we check that suitable opened file doesn’t exceed the limits, and if limits are exceeded, we close it and move into pending state using *closeCurrentPartFile:568 line - private method* # on each timer request (*onProcessingTime:482 line*), we check, if items haven't been added to the opened file longer, than specified period of time, we close it, using the same private method *closeCurrentPartFile:588 line* So, the only way, that we have, is to call our hook from *closeCurrentPartFile*, that is private, so we copy-pasted the current impl and injected our logic there Files are moving from pending state into final, during checkpointing lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and contains a lot of logic, including discovery of files in pending states, synchronization of state access and it’s modification, etc … So we couldn’t override it, or call super method and add some logic, because when current impl changes the state of files, it removes them from state, and we don’t have any opportunity to know, for which files state have been changed. To solve such problem, we've created the following interface /** * The {@code FileStateChangeCallback} is used to perform any additional operations, when {@link BucketingSink} * moves file from one state to another. For more information about state management of {@code BucketingSink}, look * through it's official documentation. */ public interface FileStateChangeCallback extends Serializable { /** * Used to perform any additional operations, related with moving of file into next state. * * @param fs provides access for working with file system * @param path path to the file, moved into next state * * @throws IOException if something went wrong, while performing any operations with file system */ void call(FileSystem fs, Path path) throws IOException; } And have added an ability to register this callbacks in BucketingSink impl in the following manner public BucketingSink registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...} public BucketingSink registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks) \{...} I’m ready to discuss the best ways, how such hooks could be implemented in the core impl or any other improvements, that will help us to add such functionality into our extension, using public api, instead of copy-pasting the source code. Thx for your help, mates =) [*See More* from Piotr Nowojski|x-redundant-cluster-toggle://0] Sincerely yours, *Rinat Sharipov* Software Engineer at 1DMP CORE Team email: [r.shari...@cleverdata.ru|mailto:a.totma...@
[jira] [Updated] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rinat Sharipov updated FLINK-9592: -- Summary: Notify on moving file into pending/ final state (was: Notify on moving into pending/ final state) > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Priority: Major > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The {@code FileStateChangeCallback} is used to perform any additional > operations, when {@link BucketingSink} > * moves file from one state to another. For more information about state > management of {@code BucketingSink}, look > * through it's official documentation. > */ > public interface FileStateChangeCallback extends Serializable { > /** > * Used to perform any additional operations, related with moving of file > into next state. > * > * @param fs provides access for working with file system > * @param path path to the file, moved into next state > * > * @throws IOException if something went wrong, while performing any > operations with file system > */ > void call(FileSystem fs, Path path) throws IOException; > } > And have added an ability to register this callbacks in BucketingSink impl in > the following manner > > public BucketingSink > registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...} > public BucketingSink > registerOnPendingStateChangeCallback(FileStateChange
[jira] [Commented] (FLINK-9380) Failing end-to-end tests should not clean up logs
[ https://issues.apache.org/jira/browse/FLINK-9380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512898#comment-16512898 ] Deepak Sharma commented on FLINK-9380: -- Hi [~till.rohrmann], would mind giving examples? I might be able to take a look. > Failing end-to-end tests should not clean up logs > - > > Key: FLINK-9380 > URL: https://issues.apache.org/jira/browse/FLINK-9380 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > Some of the end-to-end tests clean up their logs also in the failure case. > This makes debugging and understanding the problem extremely difficult. > Ideally, the scripts says where it stored the respective logs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6149: [FLINK-9560] Add RateLimiting for FileSystem
Github user etiennecarriere commented on the issue: https://github.com/apache/flink/pull/6149 @pnowojski , I add a unit test which validate the rate limiting feature but : * It add 20s to the unit test . Would it be better to move to Integration Test (even if it is not an Integration Test) * I take 10 seconds for each test to have some stability Open to your suggestions. ---
[jira] [Commented] (FLINK-9560) RateLimiting for FileSystem
[ https://issues.apache.org/jira/browse/FLINK-9560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512891#comment-16512891 ] ASF GitHub Bot commented on FLINK-9560: --- Github user etiennecarriere commented on the issue: https://github.com/apache/flink/pull/6149 @pnowojski , I add a unit test which validate the rate limiting feature but : * It add 20s to the unit test . Would it be better to move to Integration Test (even if it is not an Integration Test) * I take 10 seconds for each test to have some stability Open to your suggestions. > RateLimiting for FileSystem > --- > > Key: FLINK-9560 > URL: https://issues.apache.org/jira/browse/FLINK-9560 > Project: Flink > Issue Type: Improvement > Components: FileSystem >Affects Versions: 1.5.0 >Reporter: Etienne CARRIERE >Priority: Major > > *Pain*: On our system, we see that during checkpoint , all the bandwidth is > take to send the checkpoint to object storage (s3 in our case) > *Proposal* : After the creation of some limitation on Filesystem (mostly > number of connections with the tickets FLINK-8125/FLINK-8198/FLINK-9468), I > propose to add ratelimiting "per Filesystem" . > *Proposal of implementation* : Modify LimitedConnectionsFileSystem to add a > ratelimiter on both Input and OutputStream. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9168) Pulsar Sink Connector
[ https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512865#comment-16512865 ] ASF GitHub Bot commented on FLINK-9168: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5845 it is not possible to create a PR against a non-existing branch. We will either have to create a pulsar-connector branch up front, or open the first PR against master and merge it into a new branch. I would suggest the latter option. > Pulsar Sink Connector > - > > Key: FLINK-9168 > URL: https://issues.apache.org/jira/browse/FLINK-9168 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Zongyang Xiao >Priority: Minor > Fix For: 1.6.0 > > > Flink does not provide a sink connector for Pulsar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5845 it is not possible to create a PR against a non-existing branch. We will either have to create a pulsar-connector branch up front, or open the first PR against master and merge it into a new branch. I would suggest the latter option. ---
[jira] [Assigned] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder
[ https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-5860: --- Assignee: Mahesh Senniappan (was: Yaroslav Mykhaylov) > Replace all the file creating from java.io.tmpdir with TemporaryFolder > -- > > Key: FLINK-5860 > URL: https://issues.apache.org/jira/browse/FLINK-5860 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: shijinkui >Assignee: Mahesh Senniappan >Priority: Major > Labels: starter > > Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will > get a Unit test list. Replace all the file creating from `java.io.tmpdir` > with TemporaryFolder. > Who can fix this problem thoroughly? > ``` > $ grep -ri 'System.getProperty("java.io.tmpdir")' . > ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java: > env.setStateBackend(new FsStateBackend("file:///" + > System.getProperty("java.io.tmpdir") + "/flink/backend")); > ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: > return getMockEnvironment(new File[] { new > File(System.getProperty("java.io.tmpdir")) }); > ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java: >public static final String DEFAULT_TASK_MANAGER_TMP_PATH = > System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java: > final String tempPath = System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java: > final File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java: > final String outDir = params.get("output", > System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java: > final String tmpDir = System.getProperty("java.io.tmpdir"); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java: > final String outPath = System.getProperty("java.io.tmpdir"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: >public static final String FLINK_PYTHON_FILE_PATH = > System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: >public static final String FLINK_TMP_DATA_DIR = > System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: >FLINK_HDFS
[jira] [Commented] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder
[ https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512860#comment-16512860 ] Chesnay Schepler commented on FLINK-5860: - I'll assign it to you. > Replace all the file creating from java.io.tmpdir with TemporaryFolder > -- > > Key: FLINK-5860 > URL: https://issues.apache.org/jira/browse/FLINK-5860 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: shijinkui >Assignee: Yaroslav Mykhaylov >Priority: Major > Labels: starter > > Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will > get a Unit test list. Replace all the file creating from `java.io.tmpdir` > with TemporaryFolder. > Who can fix this problem thoroughly? > ``` > $ grep -ri 'System.getProperty("java.io.tmpdir")' . > ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java: > env.setStateBackend(new FsStateBackend("file:///" + > System.getProperty("java.io.tmpdir") + "/flink/backend")); > ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: > return getMockEnvironment(new File[] { new > File(System.getProperty("java.io.tmpdir")) }); > ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java: >public static final String DEFAULT_TASK_MANAGER_TMP_PATH = > System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java: > final String tempPath = System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java: > final File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java: > final String outDir = params.get("output", > System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java: > final String tmpDir = System.getProperty("java.io.tmpdir"); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java: > final String outPath = System.getProperty("java.io.tmpdir"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: >public static final String FLINK_PYTHON_FILE_PATH = > System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: >public static final String FLINK_TMP_DATA_DIR = > System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: >
[jira] [Commented] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder
[ https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512859#comment-16512859 ] Chesnay Schepler commented on FLINK-5860: - [~maheshsenni] yes i believe this issue is still valid. > Replace all the file creating from java.io.tmpdir with TemporaryFolder > -- > > Key: FLINK-5860 > URL: https://issues.apache.org/jira/browse/FLINK-5860 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: shijinkui >Assignee: Yaroslav Mykhaylov >Priority: Major > Labels: starter > > Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will > get a Unit test list. Replace all the file creating from `java.io.tmpdir` > with TemporaryFolder. > Who can fix this problem thoroughly? > ``` > $ grep -ri 'System.getProperty("java.io.tmpdir")' . > ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java: > env.setStateBackend(new FsStateBackend("file:///" + > System.getProperty("java.io.tmpdir") + "/flink/backend")); > ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: > return getMockEnvironment(new File[] { new > File(System.getProperty("java.io.tmpdir")) }); > ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java: >public static final String DEFAULT_TASK_MANAGER_TMP_PATH = > System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java: > final String tempPath = System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java: > final File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java: > final String outDir = params.get("output", > System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java: > final String tmpDir = System.getProperty("java.io.tmpdir"); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java: > final String outPath = System.getProperty("java.io.tmpdir"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: >public static final String FLINK_PYTHON_FILE_PATH = > System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: >public static final String FLINK_TMP_DATA_DIR = > System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.
[jira] [Comment Edited] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder
[ https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16502481#comment-16502481 ] Mahesh Senniappan edited comment on FLINK-5860 at 6/14/18 6:35 PM: --- Hello [~fhue...@gmail.com], Is this still available to work? I would like to pick this up if it is available. Edit: Added Fabian for visibility. was (Author: maheshsenni): Hello, is there someone working on this? If not, would you mind if I have a go at this? > Replace all the file creating from java.io.tmpdir with TemporaryFolder > -- > > Key: FLINK-5860 > URL: https://issues.apache.org/jira/browse/FLINK-5860 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: shijinkui >Assignee: Yaroslav Mykhaylov >Priority: Major > Labels: starter > > Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will > get a Unit test list. Replace all the file creating from `java.io.tmpdir` > with TemporaryFolder. > Who can fix this problem thoroughly? > ``` > $ grep -ri 'System.getProperty("java.io.tmpdir")' . > ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java: > env.setStateBackend(new FsStateBackend("file:///" + > System.getProperty("java.io.tmpdir") + "/flink/backend")); > ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: > return getMockEnvironment(new File[] { new > File(System.getProperty("java.io.tmpdir")) }); > ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java: >public static final String DEFAULT_TASK_MANAGER_TMP_PATH = > System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java: > final String tempPath = System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java: > final File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java: > final String outDir = params.get("output", > System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java: > final String tmpDir = System.getProperty("java.io.tmpdir"); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java: > final String outPath = System.getProperty("java.io.tmpdir"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: >public static final String FLINK_PYTHON_FILE_PATH = > System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; > ./flink-libraries/flink-python/src/main/java/org/apache/f
[jira] [Created] (FLINK-9591) Remove remnants of distributed-cache logic
Chesnay Schepler created FLINK-9591: --- Summary: Remove remnants of distributed-cache logic Key: FLINK-9591 URL: https://issues.apache.org/jira/browse/FLINK-9591 Project: Flink Issue Type: Bug Components: Python API Affects Versions: 1.6.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.6.0 There are still some remnants in the python API for the old distributed cache logic, where we uploaded files first to a DFS before registering them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5843: [FLINK-8744][docs] Generate "Common Option" section
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5843 merging. ---
[jira] [Commented] (FLINK-8744) Add annotations for documenting common/advanced options
[ https://issues.apache.org/jira/browse/FLINK-8744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512814#comment-16512814 ] ASF GitHub Bot commented on FLINK-8744: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5843 merging. > Add annotations for documenting common/advanced options > --- > > Key: FLINK-8744 > URL: https://issues.apache.org/jira/browse/FLINK-8744 > Project: Flink > Issue Type: New Feature > Components: Configuration, Documentation >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.6.0 > > > The {{ConfigDocsGenerator}} only generates [the full configuration > reference|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#full-reference]. > The > [common|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#common-options] > and > [advanced|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#advanced-options] > sections are still manually defined in {{config.md}} and thus prone to being > outdated / out-of-sync with the full reference. > I propose adding {{@Common}}/{{@Advanced}} annotations based on which we > could generate these sections as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9530) Task numRecords metrics broken for chains
[ https://issues.apache.org/jira/browse/FLINK-9530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512801#comment-16512801 ] ASF GitHub Bot commented on FLINK-9530: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6126 merging. > Task numRecords metrics broken for chains > - > > Key: FLINK-9530 > URL: https://issues.apache.org/jira/browse/FLINK-9530 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > The {{numRecordsIn/Out}} metrics for tasks is currently broken. We are > wrongly adding up the numRecordsIn/Out metrics for all operators in the > chain, instead of just the head/tail operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6126: [FLINK-9530][metrics] Fix numRecords task metric for chai...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6126 merging. ---
[jira] [Commented] (FLINK-9257) End-to-end tests prints "All tests PASS" even if individual test-script returns non-zero exit code
[ https://issues.apache.org/jira/browse/FLINK-9257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512796#comment-16512796 ] ASF GitHub Bot commented on FLINK-9257: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6053 merging. > End-to-end tests prints "All tests PASS" even if individual test-script > returns non-zero exit code > -- > > Key: FLINK-9257 > URL: https://issues.apache.org/jira/browse/FLINK-9257 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Critical > Fix For: 1.6.0 > > > In some cases the test-suite exits with non-zero exit code but still prints > "All tests PASS" to stdout. This happens because how the test runner works, > which is roughly as follows > # Either run-nightly-tests.sh or run-precommit-tests.sh executes a suite of > tests consisting of one multiple bash scripts. > # As soon as one of those bash scripts exists with non-zero exit code, the > tests won't continue to run and the test-suite will also exit with non-zero > exit code. > # *During the cleanup hook (trap cleanup EXIT in common.sh) it will be > checked whether there are non-empty out files or log files with certain > exceptions. If a tests fails with non-zero exit code, but does not have any > exceptions or .out files, this will still print "All tests PASS" to stdout, > even though they don't* > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6053: [FLINK-9257][E2E Tests] Fix wrong "All tests pass" messag...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6053 merging. ---
[jira] [Created] (FLINK-9590) HistogramDump should be immutable
Chesnay Schepler created FLINK-9590: --- Summary: HistogramDump should be immutable Key: FLINK-9590 URL: https://issues.apache.org/jira/browse/FLINK-9590 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.5.0, 1.6.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.6.0 The {{HistogramDump}} represents the contents of a histogram at one point in time, and should thus not be mutable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9168) Pulsar Sink Connector
[ https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512699#comment-16512699 ] ASF GitHub Bot commented on FLINK-9168: --- Github user XiaoZYang commented on the issue: https://github.com/apache/flink/pull/5845 @hsaputra There is no error but the issue is 1. when I create a PR to the flink repo from my fork repo I need to choose a branch to which the commits are merged to 2. I'm expected to merge the commits are merged to a branch name like 'pulsar-connector', but I can't create a new branch at flink repo Is there a better way to do that? Thank for you helping me ! > Pulsar Sink Connector > - > > Key: FLINK-9168 > URL: https://issues.apache.org/jira/browse/FLINK-9168 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Zongyang Xiao >Priority: Minor > Fix For: 1.6.0 > > > Flink does not provide a sink connector for Pulsar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector
Github user XiaoZYang commented on the issue: https://github.com/apache/flink/pull/5845 @hsaputra There is no error but the issue is 1. when I create a PR to the flink repo from my fork repo I need to choose a branch to which the commits are merged to 2. I'm expected to merge the commits are merged to a branch name like 'pulsar-connector', but I can't create a new branch at flink repo Is there a better way to do that? Thank for you helping me ! ---
[jira] [Commented] (FLINK-9168) Pulsar Sink Connector
[ https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512689#comment-16512689 ] ASF GitHub Bot commented on FLINK-9168: --- Github user hsaputra commented on the issue: https://github.com/apache/flink/pull/5845 @XiaoZYang You can close this PR and create new branch to submit new PR since you are the creator os this one. Did you see any error or something preventing you to close this one? > Pulsar Sink Connector > - > > Key: FLINK-9168 > URL: https://issues.apache.org/jira/browse/FLINK-9168 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Zongyang Xiao >Priority: Minor > Fix For: 1.6.0 > > > Flink does not provide a sink connector for Pulsar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector
Github user hsaputra commented on the issue: https://github.com/apache/flink/pull/5845 @XiaoZYang You can close this PR and create new branch to submit new PR since you are the creator os this one. Did you see any error or something preventing you to close this one? ---
[jira] [Commented] (FLINK-9494) Race condition in Dispatcher with concurrent granting and revoking of leaderhship
[ https://issues.apache.org/jira/browse/FLINK-9494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512686#comment-16512686 ] ASF GitHub Bot commented on FLINK-9494: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6155 > Race condition in Dispatcher with concurrent granting and revoking of > leaderhship > - > > Key: FLINK-9494 > URL: https://issues.apache.org/jira/browse/FLINK-9494 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.6.0, 1.5.1 > > > The {{Dispatcher}} contains a race condition when an instance is granted > leadership and then quickly afterwards gets the leadership revoked. The > problem is that we don't check in the recovered jobs future callback that we > still have the leadership. This can lead to a corrupted state of the > {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9573) Check for leadership with leader session id
[ https://issues.apache.org/jira/browse/FLINK-9573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9573. Resolution: Fixed Fix Version/s: 1.5.1 1.6.0 Fixed via 1.6.0: 363de6b643689f64564270857f1daf7f6c59257f 1.5.1: 7286d0bc90b23e070a5e91e84325ad6a92f09bfa > Check for leadership with leader session id > --- > > Key: FLINK-9573 > URL: https://issues.apache.org/jira/browse/FLINK-9573 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > In order to check whether a {{LeaderContender}} is still the leader, it is > not sufficient to simply provide a {{LeaderElectionService#hasLeadership()}}. > Instead, we should extend this method to also take the leader session id as a > parameter to distinguish between different calls from the same leader > contender with different leader session ids. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9573) Check for leadership with leader session id
[ https://issues.apache.org/jira/browse/FLINK-9573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512687#comment-16512687 ] ASF GitHub Bot commented on FLINK-9573: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6154 > Check for leadership with leader session id > --- > > Key: FLINK-9573 > URL: https://issues.apache.org/jira/browse/FLINK-9573 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > In order to check whether a {{LeaderContender}} is still the leader, it is > not sufficient to simply provide a {{LeaderElectionService#hasLeadership()}}. > Instead, we should extend this method to also take the leader session id as a > parameter to distinguish between different calls from the same leader > contender with different leader session ids. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6154: [FLINK-9573] Extend LeaderElectionService#hasLeade...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6154 ---
[jira] [Closed] (FLINK-9494) Race condition in Dispatcher with concurrent granting and revoking of leaderhship
[ https://issues.apache.org/jira/browse/FLINK-9494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9494. Resolution: Fixed Fixed via 1.6.0: 7e51b90d909c6feaac6ed48140df00372e95a45c 1.5.1: a9787d7fb616d231007b7c82e2f5f526370eddf8 > Race condition in Dispatcher with concurrent granting and revoking of > leaderhship > - > > Key: FLINK-9494 > URL: https://issues.apache.org/jira/browse/FLINK-9494 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.6.0, 1.5.1 > > > The {{Dispatcher}} contains a race condition when an instance is granted > leadership and then quickly afterwards gets the leadership revoked. The > problem is that we don't check in the recovered jobs future callback that we > still have the leadership. This can lead to a corrupted state of the > {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6155: [FLINK-9494] Fix race condition in Dispatcher with...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6155 ---
[jira] [Commented] (FLINK-9168) Pulsar Sink Connector
[ https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512682#comment-16512682 ] ASF GitHub Bot commented on FLINK-9168: --- Github user XiaoZYang commented on the issue: https://github.com/apache/flink/pull/5845 I prefer to open a new PR and a new branch. But I am not authorized to do that. > Pulsar Sink Connector > - > > Key: FLINK-9168 > URL: https://issues.apache.org/jira/browse/FLINK-9168 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Zongyang Xiao >Priority: Minor > Fix For: 1.6.0 > > > Flink does not provide a sink connector for Pulsar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector
Github user XiaoZYang commented on the issue: https://github.com/apache/flink/pull/5845 I prefer to open a new PR and a new branch. But I am not authorized to do that. ---
[jira] [Created] (FLINK-9589) PythonOperationInfo should be immutable
Chesnay Schepler created FLINK-9589: --- Summary: PythonOperationInfo should be immutable Key: FLINK-9589 URL: https://issues.apache.org/jira/browse/FLINK-9589 Project: Flink Issue Type: Bug Components: Python API Affects Versions: 1.4.2, 1.5.0, 1.3.3, 1.2.1, 1.6.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.6.0 The {{PythonOperationInfo}} is a simple represantation of a dataset operation defined by the python plan. Thus the entire object should be immutable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512652#comment-16512652 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195464823 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml --- @@ -0,0 +1,228 @@ + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + + 4.0.0 + + flink-confluent-schema-registry + + UTF-8 + 2.11 + 4.1.0 + 0.2.6 + 1.8 + + + + + + org.apache.flink + flink-core + ${project.version} + + + + org.apache.flink + flink-clients_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-scala_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-streaming-scala_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-connector-kafka-0.10_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-connector-kafka-0.8_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-avro + ${project.version} + + + + org.apache.avro + avro + + + io.confluent + kafka-avro-serializer + ${confluent.version} + + + + tech.allegro.schema.json2avro + converter + ${json2avro.version} + + + + + + + build-jar + + false + + + + + src/main/resources/${resource.dir} + true + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.4.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + + + + +
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512661#comment-16512661 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195471771 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.serialization.SerializationSchema; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +/** + * The serialization schema for the Avro type. + */ +public class AvroSerializationConfluentSchema implements SerializationSchema { + + private static final long serialVersionUID = 1L; + + private final String schemaRegistryUrl; + + private final int identityMapCapacity; + + private KafkaAvroSerializer kafkaAvroSerializer; --- End diff -- Same here with non-serializable fields which should be marked as `transient`. > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512668#comment-16512668 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195474165 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.serialization.SerializationSchema; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +/** + * The serialization schema for the Avro type. + */ +public class AvroSerializationConfluentSchema implements SerializationSchema { + + private static final long serialVersionUID = 1L; + + private final String schemaRegistryUrl; + + private final int identityMapCapacity; + + private KafkaAvroSerializer kafkaAvroSerializer; + + private String topicName; + + private SchemaRegistryClient schemaRegistryClient; + + private JsonAvroConverter jsonAvroConverter; + + private final Class avroType; + + public AvroSerializationConfluentSchema(Class avroType, String schemaRegistryUrl, String topicName) { --- End diff -- The serialization format should not need to know about the `topicName` which is used here for the schema lookup. Better to use `KafkaAvroEncoder` which automatically retrieves the schema name. > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512648#comment-16512648 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195447395 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/TestAvroConsumerConfluent.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; + +import example.avro.User; + +import java.util.Properties; + +/** + * A simple example that shows how to read from and write to Kafka with Confluent Schema Registry. + * This will read AVRO messages from the input topic, parse them into a POJO type via checking the Schema by calling Schema registry. + * Then this example publish the POJO type to kafka by converting the POJO to AVRO and verifying the schema. + * --input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --schema-registry-url http://localhost:8081 --group.id myconsumer + */ +public class TestAvroConsumerConfluent { + + public static void main(String[] args) throws Exception { + Properties config = new Properties(); + // parse input arguments + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + + if (parameterTool.getNumberOfParameters() < 6) { + System.out.println("Missing parameters!\n" + + "Usage: Kafka --input-topic --output-topic " + + "--bootstrap.servers " + + "--zookeeper.connect " + + "--schema-registry-url --group.id "); + return; + } + config.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers")); + config.setProperty("group.id", parameterTool.getRequired("group.id")); + config.setProperty("zookeeper.connect", parameterTool.getRequired("zookeeper.connect")); + String schemaRegistryUrl = parameterTool.getRequired("schema-registry-url"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + DataStreamSource input = env + .addSource( + new FlinkKafkaConsumer010( + parameterTool.getRequired("input-topic"), + new AvroDeserializationConfluentSchema(User.class, schemaRegistryUrl), --- End diff -- raw usage of `AvroDeserializationConfluentSchema` better to use `AvroDeserializationConfluentSchema<>`. Then we can also remove `User` in `FlinkKafkaConsumer010`. > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro ty
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512649#comment-16512649 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195453836 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml --- @@ -0,0 +1,228 @@ + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + + 4.0.0 + + flink-confluent-schema-registry + + UTF-8 + 2.11 + 4.1.0 + 0.2.6 + 1.8 + + + + + + org.apache.flink + flink-core + ${project.version} + + + + org.apache.flink + flink-clients_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-scala_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-streaming-scala_${scala.binary.version} + ${project.version} + --- End diff -- I think we can replace the two Scala dependencies above with ``` org.apache.flink flink-streaming-java_${scala.binary.version} ${project.version} ``` > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512666#comment-16512666 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195473241 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.serialization.SerializationSchema; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +/** + * The serialization schema for the Avro type. + */ +public class AvroSerializationConfluentSchema implements SerializationSchema { + + private static final long serialVersionUID = 1L; + + private final String schemaRegistryUrl; + + private final int identityMapCapacity; + + private KafkaAvroSerializer kafkaAvroSerializer; + + private String topicName; + + private SchemaRegistryClient schemaRegistryClient; + + private JsonAvroConverter jsonAvroConverter; + + private final Class avroType; + + public AvroSerializationConfluentSchema(Class avroType, String schemaRegistryUrl, String topicName) { + this(avroType, schemaRegistryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, topicName); + } + + public AvroSerializationConfluentSchema(Class avroType, String schemaRegistryUrl, int identityMapCapacity, String topicName) { + this.schemaRegistryUrl = schemaRegistryUrl; + this.identityMapCapacity = identityMapCapacity; + this.topicName = topicName; + this.avroType = avroType; + } + + @Override + public byte[] serialize(T obj) { + byte[] serializedBytes = null; + + try { + if (kafkaAvroSerializer == null) { + this.schemaRegistryClient = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity); + this.kafkaAvroSerializer = new KafkaAvroSerializer(schemaRegistryClient); + } + + String schema = schemaRegistryClient.getLatestSchemaMetadata(topicName + "-value").getSchema(); --- End diff -- Why do we have to make this schema lookup here? Shouldn't the `kafkaAvroSerializer` do the job for us? > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512656#comment-16512656 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195465238 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroDeserializationConfluentSchema.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroDecoder; +import org.apache.avro.generic.GenericData; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +import java.io.IOException; + +/** + * The deserialization schema for the Avro type. + */ +public class AvroDeserializationConfluentSchema implements DeserializationSchema { + + private static final long serialVersionUID = 1L; + + private Class avroType; + private final String schemaRegistryUrl; + private final int identityMapCapacity; + private KafkaAvroDecoder kafkaAvroDecoder; --- End diff -- Not serializable. Please mark as `transient`. > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512670#comment-16512670 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195477424 --- Diff: flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh --- @@ -0,0 +1,106 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh +source "$(dirname "$0")"/kafka-common.sh + +function verify_output { + local expected=$(printf $1) + + if [[ "$2" != "$expected" ]]; then +echo "Output from Flink program does not match expected output." +echo -e "EXPECTED FOR KEY: --$expected--" +echo -e "ACTUAL: --$2--" +PASS="" +exit 1 + fi +} + +function test_cleanup { + # don't call ourselves again for another signal interruption + trap "exit -1" INT + # don't call ourselves again for normal exit + trap "" EXIT + + stop_kafka_cluster + stop_confluent_schema_registry + + # revert our modifications to the Flink distribution + mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml + + # make sure to run regular cleanup as well + cleanup +} + +trap test_cleanup INT +trap test_cleanup EXIT + +setup_kafka_dist +setup_confluent_dist + +cd flink-end-to-end-tests/flink-confluent-schema-registry +mvn clean package -Pbuild-jar -nsu + +start_kafka_cluster +start_confluent_schema_registry +sleep 5 + +# modify configuration to use port 8082 for Flink +cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak +sed -i -e "s/web.port: 8081/web.port: 8082/" $FLINK_DIR/conf/flink-conf.yaml + +TEST_PROGRAM_JAR=target/flink-confluent-schema-registry-1.6-SNAPSHOT.jar + +INPUT_MESSAGE_1='{"name":"Alyssa","favoriteNumber":"250","favoriteColor":"green","eventType":"meeting"}' +INPUT_MESSAGE_2='{"name":"Charlie","favoriteNumber":"10","favoriteColor":"blue","eventType":"meeting"}' +INPUT_MESSAGE_3='{"name":"Ben","favoriteNumber":"7","favoriteColor":"red","eventType":"meeting"}' +USER_SCHEMA='{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string","default":""},{"name":"favoriteNumber","type":"string","default":""},{"name":"favoriteColor","type":"string","default":""},{"name":"eventType","type":{"name":"EventType","type":"enum","symbols":["meeting"]}}]}' + +curl -X POST \ + http://localhost:8081/subjects/users-value/versions \ + -H 'cache-control: no-cache' \ + -H 'content-type: application/vnd.schemaregistry.v1+json' \ + -d '{"schema": "{\"namespace\": \"example.avro\",\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"favoriteNumber\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"favoriteColor\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"eventType\",\"type\": {\"name\": \"EventType\",\"type\": \"enum\", \"symbols\": [\"meeting\"] }}]}"}' --- End diff -- Do we have to manually register the schema or does it also work with sending the first record to the input topic? > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies th
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512667#comment-16512667 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195473522 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.serialization.SerializationSchema; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +/** + * The serialization schema for the Avro type. + */ +public class AvroSerializationConfluentSchema implements SerializationSchema { + + private static final long serialVersionUID = 1L; + + private final String schemaRegistryUrl; + + private final int identityMapCapacity; + + private KafkaAvroSerializer kafkaAvroSerializer; + + private String topicName; --- End diff -- Could be final, I guess. > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512650#comment-16512650 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195464245 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml --- @@ -0,0 +1,228 @@ + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + + 4.0.0 + + flink-confluent-schema-registry + + UTF-8 + 2.11 + 4.1.0 + 0.2.6 + 1.8 + + + + + + org.apache.flink + flink-core + ${project.version} + + + + org.apache.flink + flink-clients_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-scala_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-streaming-scala_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-connector-kafka-0.10_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-connector-kafka-0.8_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-avro + ${project.version} + + + + org.apache.avro + avro + + + io.confluent + kafka-avro-serializer + ${confluent.version} + --- End diff -- Why do we need this dependency? Can't we use Flink's Avro serializer? > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512646#comment-16512646 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195463614 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml --- @@ -0,0 +1,228 @@ + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + + 4.0.0 + + flink-confluent-schema-registry + + UTF-8 + 2.11 + 4.1.0 + 0.2.6 + 1.8 + + + + + + org.apache.flink + flink-core + ${project.version} + + + + org.apache.flink + flink-clients_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-scala_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-streaming-scala_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-connector-kafka-0.10_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-connector-kafka-0.8_${scala.binary.version} + ${project.version} + --- End diff -- For what do we need this dependency? Ideally, we should drop it. > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512654#comment-16512654 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195464899 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml --- @@ -0,0 +1,228 @@ + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + + 4.0.0 + + flink-confluent-schema-registry + + UTF-8 + 2.11 + 4.1.0 + 0.2.6 + 1.8 + + + + + + org.apache.flink + flink-core + ${project.version} + + + + org.apache.flink + flink-clients_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-scala_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-streaming-scala_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-connector-kafka-0.10_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-connector-kafka-0.8_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-avro + ${project.version} + + + + org.apache.avro + avro + + + io.confluent + kafka-avro-serializer + ${confluent.version} + + + + tech.allegro.schema.json2avro + converter + ${json2avro.version} + + + + + + + build-jar + + false + + + + + src/main/resources/${resource.dir} + true + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.4.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + + + + +
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512659#comment-16512659 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195466309 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroDeserializationConfluentSchema.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroDecoder; +import org.apache.avro.generic.GenericData; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +import java.io.IOException; + +/** + * The deserialization schema for the Avro type. + */ +public class AvroDeserializationConfluentSchema implements DeserializationSchema { + + private static final long serialVersionUID = 1L; + + private Class avroType; + private final String schemaRegistryUrl; + private final int identityMapCapacity; + private KafkaAvroDecoder kafkaAvroDecoder; + + private ObjectMapper mapper; + + private JsonAvroConverter jsonAvroConverter; + + public AvroDeserializationConfluentSchema(Class avroType, String schemaRegistyUrl) { + this(avroType, schemaRegistyUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); + } + + public AvroDeserializationConfluentSchema(Class avroType, String schemaRegistryUrl, int identityMapCapacity) { + this.avroType = avroType; + this.schemaRegistryUrl = schemaRegistryUrl; + this.identityMapCapacity = identityMapCapacity; + } + + @Override + public T deserialize(byte[] message) throws IOException { + if (kafkaAvroDecoder == null) { + SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity); + this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistryClient); + } + if (mapper == null) { + this.mapper = new ObjectMapper(); + } + + if (jsonAvroConverter == null) { + jsonAvroConverter = new JsonAvroConverter(); + } + GenericData.Record record = (GenericData.Record) this.kafkaAvroDecoder.fromBytes(message); + byte[] messageBytes = jsonAvroConverter.convertToJson(record); + return (T) this.mapper.readValue(messageBytes, avroType); --- End diff -- Why do we have to introduce this indirection with Json? Converting an Avro record into Json and then into the specific types seems quite cumbersome. Better to directly read the record from the `message`. > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This mess
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512662#comment-16512662 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195472317 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.serialization.SerializationSchema; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +/** + * The serialization schema for the Avro type. + */ +public class AvroSerializationConfluentSchema implements SerializationSchema { + + private static final long serialVersionUID = 1L; + + private final String schemaRegistryUrl; + + private final int identityMapCapacity; + + private KafkaAvroSerializer kafkaAvroSerializer; + + private String topicName; + + private SchemaRegistryClient schemaRegistryClient; + + private JsonAvroConverter jsonAvroConverter; + + private final Class avroType; + + public AvroSerializationConfluentSchema(Class avroType, String schemaRegistryUrl, String topicName) { + this(avroType, schemaRegistryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, topicName); + } + + public AvroSerializationConfluentSchema(Class avroType, String schemaRegistryUrl, int identityMapCapacity, String topicName) { + this.schemaRegistryUrl = schemaRegistryUrl; + this.identityMapCapacity = identityMapCapacity; + this.topicName = topicName; + this.avroType = avroType; + } + + @Override + public byte[] serialize(T obj) { + byte[] serializedBytes = null; + + try { + if (kafkaAvroSerializer == null) { + this.schemaRegistryClient = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity); + this.kafkaAvroSerializer = new KafkaAvroSerializer(schemaRegistryClient); + } + + String schema = schemaRegistryClient.getLatestSchemaMetadata(topicName + "-value").getSchema(); + + if (jsonAvroConverter == null) { + jsonAvroConverter = new JsonAvroConverter(); + } + + //System.out.println("Schema fetched from Schema Registry for topic :" + topicName + " = " + schema); --- End diff -- Remove unused code > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512645#comment-16512645 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195452831 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml --- @@ -0,0 +1,228 @@ + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + + 4.0.0 + + flink-confluent-schema-registry + + UTF-8 + 2.11 --- End diff -- Could be inherited from the parent. > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512658#comment-16512658 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195465492 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroDeserializationConfluentSchema.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroDecoder; +import org.apache.avro.generic.GenericData; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +import java.io.IOException; + +/** + * The deserialization schema for the Avro type. + */ +public class AvroDeserializationConfluentSchema implements DeserializationSchema { + + private static final long serialVersionUID = 1L; + + private Class avroType; --- End diff -- `final` would be better > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512669#comment-16512669 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195473423 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.serialization.SerializationSchema; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +/** + * The serialization schema for the Avro type. + */ +public class AvroSerializationConfluentSchema implements SerializationSchema { + + private static final long serialVersionUID = 1L; + + private final String schemaRegistryUrl; + + private final int identityMapCapacity; + + private KafkaAvroSerializer kafkaAvroSerializer; + + private String topicName; + + private SchemaRegistryClient schemaRegistryClient; + + private JsonAvroConverter jsonAvroConverter; + + private final Class avroType; --- End diff -- This field is not used. > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512664#comment-16512664 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195472677 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.serialization.SerializationSchema; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +/** + * The serialization schema for the Avro type. + */ +public class AvroSerializationConfluentSchema implements SerializationSchema { + + private static final long serialVersionUID = 1L; + + private final String schemaRegistryUrl; + + private final int identityMapCapacity; + + private KafkaAvroSerializer kafkaAvroSerializer; + + private String topicName; + + private SchemaRegistryClient schemaRegistryClient; + + private JsonAvroConverter jsonAvroConverter; + + private final Class avroType; + + public AvroSerializationConfluentSchema(Class avroType, String schemaRegistryUrl, String topicName) { + this(avroType, schemaRegistryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, topicName); + } + + public AvroSerializationConfluentSchema(Class avroType, String schemaRegistryUrl, int identityMapCapacity, String topicName) { + this.schemaRegistryUrl = schemaRegistryUrl; + this.identityMapCapacity = identityMapCapacity; + this.topicName = topicName; + this.avroType = avroType; + } + + @Override + public byte[] serialize(T obj) { + byte[] serializedBytes = null; + + try { + if (kafkaAvroSerializer == null) { + this.schemaRegistryClient = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity); + this.kafkaAvroSerializer = new KafkaAvroSerializer(schemaRegistryClient); + } + + String schema = schemaRegistryClient.getLatestSchemaMetadata(topicName + "-value").getSchema(); + + if (jsonAvroConverter == null) { + jsonAvroConverter = new JsonAvroConverter(); + } + + //System.out.println("Schema fetched from Schema Registry for topic :" + topicName + " = " + schema); + GenericData.Record record = jsonAvroConverter.convertToGenericDataRecord(obj.toString().getBytes(), new Schema.Parser().parse(schema)); --- End diff -- Why can't we simply pass `obj` to the `kafkaAvroSerializer`? > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512660#comment-16512660 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195469740 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroDeserializationConfluentSchema.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroDecoder; +import org.apache.avro.generic.GenericData; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +import java.io.IOException; + +/** + * The deserialization schema for the Avro type. + */ +public class AvroDeserializationConfluentSchema implements DeserializationSchema { + + private static final long serialVersionUID = 1L; + + private Class avroType; + private final String schemaRegistryUrl; + private final int identityMapCapacity; + private KafkaAvroDecoder kafkaAvroDecoder; + + private ObjectMapper mapper; + + private JsonAvroConverter jsonAvroConverter; + + public AvroDeserializationConfluentSchema(Class avroType, String schemaRegistyUrl) { + this(avroType, schemaRegistyUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); + } + + public AvroDeserializationConfluentSchema(Class avroType, String schemaRegistryUrl, int identityMapCapacity) { + this.avroType = avroType; + this.schemaRegistryUrl = schemaRegistryUrl; + this.identityMapCapacity = identityMapCapacity; + } + + @Override + public T deserialize(byte[] message) throws IOException { + if (kafkaAvroDecoder == null) { + SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity); + this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistryClient); --- End diff -- Why do we use the `KafkaAvroDecoder` instead of the `KafkaAvroDeserializer`? > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512647#comment-16512647 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195452794 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml --- @@ -0,0 +1,228 @@ + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + + 4.0.0 + + flink-confluent-schema-registry + + UTF-8 + 2.11 + 4.1.0 + 0.2.6 + 1.8 --- End diff -- Nowhere used > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512671#comment-16512671 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195476574 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/example/avro/User.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package example.avro; + +/** + * Autogenerated by Avro + * DO NOT EDIT DIRECTLY + */ + +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.SchemaStore; +import org.apache.avro.specific.SpecificData; +/** +**/ + +@SuppressWarnings("all") +@org.apache.avro.specific.AvroGenerated +public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { --- End diff -- Can we automatically generate these classes by using Maven's `generate-sources` phase? > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512651#comment-16512651 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195464034 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml --- @@ -0,0 +1,228 @@ + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + + 4.0.0 + + flink-confluent-schema-registry + + UTF-8 + 2.11 + 4.1.0 + 0.2.6 + 1.8 + + + + + + org.apache.flink + flink-core + ${project.version} + + + + org.apache.flink + flink-clients_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-scala_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-streaming-scala_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-connector-kafka-0.10_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-connector-kafka-0.8_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-avro + ${project.version} + + + + org.apache.avro + avro + --- End diff -- For what do we need this dependency? > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512663#comment-16512663 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195472489 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.serialization.SerializationSchema; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +/** + * The serialization schema for the Avro type. + */ +public class AvroSerializationConfluentSchema implements SerializationSchema { + + private static final long serialVersionUID = 1L; + + private final String schemaRegistryUrl; + + private final int identityMapCapacity; + + private KafkaAvroSerializer kafkaAvroSerializer; + + private String topicName; + + private SchemaRegistryClient schemaRegistryClient; + + private JsonAvroConverter jsonAvroConverter; + + private final Class avroType; + + public AvroSerializationConfluentSchema(Class avroType, String schemaRegistryUrl, String topicName) { + this(avroType, schemaRegistryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, topicName); + } + + public AvroSerializationConfluentSchema(Class avroType, String schemaRegistryUrl, int identityMapCapacity, String topicName) { + this.schemaRegistryUrl = schemaRegistryUrl; + this.identityMapCapacity = identityMapCapacity; + this.topicName = topicName; + this.avroType = avroType; + } + + @Override + public byte[] serialize(T obj) { + byte[] serializedBytes = null; + + try { + if (kafkaAvroSerializer == null) { + this.schemaRegistryClient = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity); + this.kafkaAvroSerializer = new KafkaAvroSerializer(schemaRegistryClient); + } + + String schema = schemaRegistryClient.getLatestSchemaMetadata(topicName + "-value").getSchema(); + + if (jsonAvroConverter == null) { + jsonAvroConverter = new JsonAvroConverter(); + } + + //System.out.println("Schema fetched from Schema Registry for topic :" + topicName + " = " + schema); + GenericData.Record record = jsonAvroConverter.convertToGenericDataRecord(obj.toString().getBytes(), new Schema.Parser().parse(schema)); + + if (GenericData.get().validate(new Schema.Parser().parse(schema), record)) { + serializedBytes = kafkaAvroSerializer.serialize(topicName, record); + + } else { + System.out.println("Error :Invalid message : Doesn't follow the avro schema : Message not published to the topic, message = " + record.toString()); + + } + + } catch (Exception ex) { + ex.printStackTrace(); --- End diff -- No proper exception handling. > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 >
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512665#comment-16512665 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195472438 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.serialization.SerializationSchema; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +/** + * The serialization schema for the Avro type. + */ +public class AvroSerializationConfluentSchema implements SerializationSchema { + + private static final long serialVersionUID = 1L; + + private final String schemaRegistryUrl; + + private final int identityMapCapacity; + + private KafkaAvroSerializer kafkaAvroSerializer; + + private String topicName; + + private SchemaRegistryClient schemaRegistryClient; + + private JsonAvroConverter jsonAvroConverter; + + private final Class avroType; + + public AvroSerializationConfluentSchema(Class avroType, String schemaRegistryUrl, String topicName) { + this(avroType, schemaRegistryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, topicName); + } + + public AvroSerializationConfluentSchema(Class avroType, String schemaRegistryUrl, int identityMapCapacity, String topicName) { + this.schemaRegistryUrl = schemaRegistryUrl; + this.identityMapCapacity = identityMapCapacity; + this.topicName = topicName; + this.avroType = avroType; + } + + @Override + public byte[] serialize(T obj) { + byte[] serializedBytes = null; + + try { + if (kafkaAvroSerializer == null) { + this.schemaRegistryClient = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity); + this.kafkaAvroSerializer = new KafkaAvroSerializer(schemaRegistryClient); + } + + String schema = schemaRegistryClient.getLatestSchemaMetadata(topicName + "-value").getSchema(); + + if (jsonAvroConverter == null) { + jsonAvroConverter = new JsonAvroConverter(); + } + + //System.out.println("Schema fetched from Schema Registry for topic :" + topicName + " = " + schema); + GenericData.Record record = jsonAvroConverter.convertToGenericDataRecord(obj.toString().getBytes(), new Schema.Parser().parse(schema)); + + if (GenericData.get().validate(new Schema.Parser().parse(schema), record)) { + serializedBytes = kafkaAvroSerializer.serialize(topicName, record); + + } else { + System.out.println("Error :Invalid message : Doesn't follow the avro schema : Message not published to the topic, message = " + record.toString()); --- End diff -- No println logging. Better to use proper loggers. > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type:
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512657#comment-16512657 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195465338 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroDeserializationConfluentSchema.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroDecoder; +import org.apache.avro.generic.GenericData; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +import java.io.IOException; + +/** + * The deserialization schema for the Avro type. + */ +public class AvroDeserializationConfluentSchema implements DeserializationSchema { + + private static final long serialVersionUID = 1L; + + private Class avroType; + private final String schemaRegistryUrl; + private final int identityMapCapacity; + private KafkaAvroDecoder kafkaAvroDecoder; + + private ObjectMapper mapper; + + private JsonAvroConverter jsonAvroConverter; --- End diff -- `transient` > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512653#comment-16512653 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195464456 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml --- @@ -0,0 +1,228 @@ + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + + 4.0.0 + + flink-confluent-schema-registry + + UTF-8 + 2.11 + 4.1.0 + 0.2.6 + 1.8 + + + + + + org.apache.flink + flink-core + ${project.version} + + + + org.apache.flink + flink-clients_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-scala_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-streaming-scala_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-connector-kafka-0.10_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-connector-kafka-0.8_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-avro + ${project.version} + + + + org.apache.avro + avro + + + io.confluent + kafka-avro-serializer + ${confluent.version} + + + + tech.allegro.schema.json2avro + converter + ${json2avro.version} + --- End diff -- Why do we need this dependency? > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512655#comment-16512655 ] ASF GitHub Bot commented on FLINK-8983: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6083#discussion_r195465088 --- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml --- @@ -0,0 +1,228 @@ + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + + 4.0.0 + + flink-confluent-schema-registry + + UTF-8 + 2.11 + 4.1.0 + 0.2.6 + 1.8 + + + + + + org.apache.flink + flink-core + ${project.version} + + + + org.apache.flink + flink-clients_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-scala_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-streaming-scala_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-connector-kafka-0.10_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-connector-kafka-0.8_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-avro + ${project.version} + + + + org.apache.avro + avro + + + io.confluent + kafka-avro-serializer + ${confluent.version} + + + + tech.allegro.schema.json2avro + converter + ${json2avro.version} + + + + + + + build-jar + + false + + + + + src/main/resources/${resource.dir} + true + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.4.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + + + + +