[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688024#comment-16688024 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r233824356 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.java.tuple.Tuple2; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.pubsub.v1.PubsubMessage; + +import java.io.IOException; + +/** + * A bounded PubSub Source, similar to {@link PubSubSource} but this will stop at some point. + * For example after a period of being idle or and after n amount of messages have been received. + */ +public class BoundedPubSubSource extends PubSubSource { Review comment: So wait, this class only exists for testing? Why is it not under `src/test` in that case? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688020#comment-16688020 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r233831047 ## File path: flink-examples/flink-examples-streaming/pom.xml ## @@ -562,6 +578,43 @@ under the License. + + + PubSub + package + + shade + + + false + true + false + + + org.apache.flink.streaming.examples.pubsub.PubSubExample + + + PubSub + + + org.apache.flink:flink-connector-pubsub_${scala.binary.version} + + org/apache/flink/streaming/connectors/pubsub/** Review comment: indentation This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688021#comment-16688021 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r233826416 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub.common; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.auth.Credentials; + +import java.io.IOException; +import java.io.Serializable; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * Wrapper class for CredentialsProvider to make it Serializable. This can be used to pass on Credentials to SourceFunctions + */ +public class SerializableCredentialsProvider implements CredentialsProvider, Serializable { + private final Credentials credentials; Review comment: But then what do we need this class for? If credentials are serializable, why aren't we storing them directly in the source/sink instance? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688022#comment-16688022 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r233826562 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** + * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them as soon as they have been received. + */ +public class PubSubSource extends MultipleIdsMessageAcknowledgingSourceBase + implements ResultTypeQueryable, ParallelSourceFunction, StoppableFunction { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSource.class); + protected DeserializationSchema deserializationSchema; + protected SubscriberWrapper subscriberWrapper; + + protected boolean running = true; + protected transient volatile SourceContext sourceContext = null; + + protected PubSubSource() { + super(String.class); + } + + protected void setDeserializationSchema(DeserializationSchema deserializationSchema) { + this.deserializationSchema = deserializationSchema; + } + + protected void setSubscriberWrapper(SubscriberWrapper subscriberWrapper) { + this.subscriberWrapper = subscriberWrapper; + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + subscriberWrapper.initialize(); + if (hasNoCheckpointingEnabled(getRuntimeContext())) { + throw new IllegalArgumentException("The PubSubSource REQUIRES Checkpointing to be enabled and " + + "the checkpointing frequency must be MUCH lower than the PubSub timeout for it to retry a message."); + } + + getRuntimeContext().getMetricGroup().gauge("PubSubMessagesProcessedNotAcked", this::getOutstandingMessagesToAck); + getRuntimeContext().getMetricGroup().gauge("PubSubMessagesReceivedNotProcessed", subscriberWrapper::amountOfMessagesInBuffer); + } + + private boolean hasNoCheckpointingEnabled(RuntimeContext runtimeContext) { + return !(runtimeContext instanceof StreamingRuntimeContext && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()); + } + + @Override + protected void acknow
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688023#comment-16688023 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r233825877 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** + * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them as soon as they have been received. Review comment: But then the javadocs aren't quite correct, are they? Specifically `Acknowledge them as soon as they have been received`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687955#comment-16687955 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r233823440 ## File path: flink-examples/flink-examples-streaming/pom.xml ## @@ -62,6 +66,17 @@ under the License. ${project.version} + + org.apache.flink + flink-connector-pubsub_${scala.binary.version} Review comment: I meant the connector module. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687947#comment-16687947 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#issuecomment-439023607 The test failure is likely just an instability in the kafka test, feel free to ignore it for now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16686241#comment-16686241 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#issuecomment-438593620 I check why the build failed and all I could find is this: [FAIL] 'Kafka 0.10 end-to-end test' failed after 0 minutes and 54 seconds! Test exited with exit code 0 but the logs contained errors, exceptions or non-empty .out files I have been unable to find out why this happens and what our changes did to cause this. Please advise. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683937#comment-16683937 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232700234 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +/** + * This class defines a bound based on messages received or time since last received message. + * Using start(SourceFunction) starts the bound. Everytime a message is received the sourceFunction should call receivedMessage(). + * When the bound is reached, the sourcefunction gets closed by calling sourceFunction.close() + * See {@link BoundedPubSubSource}. + * + * @param type of message that is received by the SourceFunction. + */ +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683940#comment-16683940 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#issuecomment-437922403 From the review comments a few points remain open for me: Both are about the way the 'testing' code is part of the code base: - The stopping of the Source (Bound being a separate Source) - The connection to the local PubSub emulator (hostAndPort). How do we proceed on these points? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683934#comment-16683934 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232699534 ## File path: flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.Test; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +/** + * Tests for {@link BoundedPubSubSource}. + */ +public class BoundedPubSubSourceTest { + private final Bound bound = mock(Bound.class); + private final SubscriberWrapper subscriberWrapper = mock(SubscriberWrapper.class); + private final SourceFunction.SourceContext sourceContext = mock(SourceFunction.SourceContext.class); + private final AckReplyConsumer ackReplyConsumer = mock(AckReplyConsumer.class); + private final DeserializationSchema deserializationSchema = mock(DeserializationSchema.class); + private final MetricGroup metricGroup = mock(MetricGroup.class); Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683926#comment-16683926 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232697684 ## File path: flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.junit.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +/** + * Test for {@link Bound}. + */ +public class BoundTest { + private SourceFunction sourceFunction = mock(SourceFunction.class); Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683894#comment-16683894 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232687817 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +/** + * This class defines a bound based on messages received or time since last received message. + * Using start(SourceFunction) starts the bound. Everytime a message is received the sourceFunction should call receivedMessage(). + * When the bound is reached, the sourcefunction gets closed by calling sourceFunction.close() + * See {@link BoundedPubSubSource}. + * + * @param type of message that is received by the SourceFunction. + */ +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; + } + + static Bound boundByAmountOfMessages(long maxMessagedReceived) { + return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L); + } + + static Bound boundByTimeSinceLastMessage(long maxTimeBetweenMessages) { + return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages); + } + + static Bound boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) { + return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages); + } + + private TimerTask shutdownPubSubSource() { + return new TimerTask() { + @Override + public void run() { + if (maxTimeBetweenMessagesElapsed()) { + cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source"); + timer.cancel(); + } + } + }; + } + + private synchronized boolean maxTimeBetweenMessagesElapsed() { + return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages; + } + + private synchronized void cancelPubSubSource(String logMessage) { + if (!cancelled) { + cancelled = true; + sourceFunction.cancel(); + LOG.info(logMessage); + } + } + + void start(SourceFunction sourceFunction) { + if (this.sourceFunction != null) { + throw new IllegalStateException("start() already called"); + } + + this.sourceFunction = sourceFunction; + messagesReceived = 0; + + if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) { + lastReceivedMessage = System.currentTimeMillis(); +
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683906#comment-16683906 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232690036 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub.common; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.auth.Credentials; + +import java.io.IOException; +import java.io.Serializable; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * Wrapper class for CredentialsProvider to make it Serializable. This can be used to pass on Credentials to SourceFunctions + */ +public class SerializableCredentialsProvider implements CredentialsProvider, Serializable { + private final Credentials credentials; Review comment: All `Credentials` implementations are in fact serializable: public abstract class Credentials implements Serializable { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683883#comment-16683883 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232626013 ## File path: flink-examples/flink-examples-streaming/pom.xml ## @@ -62,6 +66,17 @@ under the License. ${project.version} + + org.apache.flink + flink-connector-pubsub_${scala.binary.version} Review comment: I double checked but all the other connectors are also in org.apache.flink (So I'm not going to change this one) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683880#comment-16683880 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232681463 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java ## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSink.class); + + private SerializableCredentialsProvider serializableCredentialsProvider; + private SerializationSchema serializationSchema; + private String projectName; + private String topicName; + private String hostAndPort = null; + + private transient Publisher publisher; + + private PubSubSink() { + } + + void setSerializableCredentialsProvider(SerializableCredentialsProvider serializableCredentialsProvider) { + this.serializableCredentialsProvider = serializableCredentialsProvider; + } + + void setSerializationSchema(SerializationSchema serializationSchema) { + this.serializationSchema = serializationSchema; + } + + void setProjectName(String projectName) { + this.projectName = projectName; + } + + void setTopicName(String topicName) { + this.topicName = topicName; + } + + /** +* Set the custom hostname/port combination of PubSub. +* The ONLY reason to use this is during tests with the emulator provided by Google. +* +* @param hostAndPort The combination of hostname and port to connect to ("hostname:1234") +*/ + void withHostAndPort(String hostAndPort) { + this.hostAndPort = hostAndPort; + } + + void initialize() throws IOException { + if (serializableCredentialsProvider == null) { + serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables(); + } + if (serializationSchema == null) { + throw new IllegalArgumentException("The serializationSchema has not been specified."); + } + if (projectName == null) { + throw new IllegalArgumentException("The projectName has not been specified."); + } + if (topicName == null) { + throw new IllegalArgumentException("The topicName has not been specified."); + } + } + + + private transient ManagedChannel managedChannel = null; + private transient TransportChan
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683747#comment-16683747 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#issuecomment-437867011 I have fixed a subset of the review comments and rebased against the current master. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683699#comment-16683699 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636212 ## File path: flink-connectors/flink-connector-pubsub/pom.xml ## @@ -0,0 +1,103 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.7-SNAPSHOT + .. + + + flink-connector-pubsub_${scala.binary.version} + flink-connector-pubsub + + jar + + + + + + com.google.cloud + google-cloud-bom + 0.53.0-alpha + pom + import + + + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + provided + + + + com.google.cloud + google-cloud-pubsub + + + + + com.google.guava + guava-jdk5 + + + provided + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + org.slf4j + slf4j-log4j12 Review comment: Yes, that makes it cleaner. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683713#comment-16683713 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232637001 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.core.ApiService; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Wrapper class around a PubSub {@link Subscriber}. + * This class makes it easier to connect to a Non Google PubSub service such as a local PubSub emulator or docker container. + */ +class SubscriberWrapper implements Serializable, MessageReceiver { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSource.class); Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683702#comment-16683702 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636463 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/DefaultPubSubSubscriberFactory.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; + +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory { + private String hostAndPort; + + void withHostAndPort(String hostAndPort) { + this.hostAndPort = hostAndPort; + } + + @Override + public Subscriber getSubscriber(CredentialsProvider credentialsProvider, ProjectSubscriptionName projectSubscriptionName, MessageReceiver messageReceiver) { + FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(1L) + .setMaxOutstandingRequestBytes(10L) Review comment: These are not empty lines. This is yet another indentation problem. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683711#comment-16683711 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636943 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683683#comment-16683683 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232634494 ## File path: flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/resources/log4j-test.properties ## @@ -0,0 +1,24 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, testlogger Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683707#comment-16683707 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636831 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** + * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them as soon as they have been received. Review comment: The MultipleIdsMessageAcknowledgingSourceBase will buffer the message handles and will only ack them AFTER the checkpoint has been completed. If there is a crash before the checkpoint the message has not yet been acked and will be retried (after timeout). If there is a crash after the checkpoint the message is persisted as part of the checkpoint. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683712#comment-16683712 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636963 ## File path: flink-connectors/flink-connector-pubsub/pom.xml ## @@ -0,0 +1,103 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.7-SNAPSHOT + .. + + + flink-connector-pubsub_${scala.binary.version} + flink-connector-pubsub + + jar + + + + + + com.google.cloud + google-cloud-bom + 0.53.0-alpha Review comment: That was 'current' when we wrote it. I just now updated it to 0.70 ... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683708#comment-16683708 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636872 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/PubSubSubscriberFactory.java ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub.common; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; + +import java.io.Serializable; + +/** + * A factory class to create a {@link Subscriber}. + * This allows for customized Subscribers with for instance tweaked configurations Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683705#comment-16683705 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636556 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/DefaultPubSubSubscriberFactory.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; + +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory { + private String hostAndPort; + + void withHostAndPort(String hostAndPort) { + this.hostAndPort = hostAndPort; + } + + @Override + public Subscriber getSubscriber(CredentialsProvider credentialsProvider, ProjectSubscriptionName projectSubscriptionName, MessageReceiver messageReceiver) { + FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(1L) + .setMaxOutstandingRequestBytes(10L) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .build(); + Subscriber.Builder builder = Subscriber + .newBuilder(ProjectSubscriptionName.of(projectSubscriptionName.getProject(), projectSubscriptionName.getSubscription()), messageReceiver) + .setFlowControlSettings(flowControlSettings) + .setCredentialsProvider(credentialsProvider); + + if (hostAndPort != null) { + ManagedChannel managedChannel = ManagedChannelBuilder + .forTarget(hostAndPort) + .usePlaintext() // This is 'Ok' because this is ONLY used for testing. Review comment: Yes, using the host and port is something that is in practice only used in testing scenarios. That is why I put the comment on the "don't do any encryption" method. I'll see if I can find a more suitable spot. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Pri
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683704#comment-16683704 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636540 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/DefaultPubSubSubscriberFactory.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; + +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory { + private String hostAndPort; Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683701#comment-16683701 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636422 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.java.tuple.Tuple2; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.pubsub.v1.PubsubMessage; + +import java.io.IOException; + +/** + * A bounded PubSub Source, similar to {@link PubSubSource} but this will stop at some point. + * For example after a period of being idle or and after n amount of messages have been received. + */ +public class BoundedPubSubSource extends PubSubSource { Review comment: We chose to split these two to keep as much of the "testing code" out of the normal runtime and thus avoid polluting it. Only the host/port thing remains (which was your other comment about this). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683697#comment-16683697 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636196 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +There are several optional methods to alter how the PubSubSource is created, the bare minimum is to provide a google project and pubsub subscription and a way to deserialize the PubSubMessages. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +DeserializationSchema deserializationSchema = (...); +SourceFunction pubsubSource = PubSubSource.newBuilder() + .withDeserializationSchema(deserializationSchema) + .withProjectSubscriptionName("google-project-name", "pubsub-subscription") + .build(); + +streamExecEnv.addSource(pubsubSource); +{% endhighlight %} + + + + PubSub Sink + +The connector provides a Sink for writing data to PubSub. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +This builder works in a similar way to the PubSubSource. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +SerializationSchema serializationSchema = (...); +SinkFunction pubsubSink = PubSubSink.newBuilder() + .withSerializationSchema(serializationSchema) + .withTopicName("pubsub-topic-name") + .withProjectName("google-project-name") + .build() + +streamExecEnv.addSink(pubsubSink); +{% endhighlight %} + + + + Google Credentials + +Google uses [Credentials](https://cloud.google.com/docs/authentication/production) to authenticate and authorize applications so that they can use Google cloud resources such as PubSub. Both builders allow several ways to provide these credentials. + +By default the connectors will look for an environment variable: [GOOGLE_APPLICATION_CREDENTIALS](https://cloud.google.com/docs/authentication/production#obtaining_and_providing_service_account_credentials_manually) which should point to a file containing the credentials. + +It is also possible to provide a Credentials object directly. For instance if you read the Credentials yourself from an external system. In this case you can use `PubSubSource.newBuilder().withCredentials(...)` + + Integration testing + +When using integration tests you might not want to connect to PubSub directly but use a docker container to read and write to. This is possible by using `PubSubSource.newBuilder().withHostAndPort("localhost:1234")`. Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like star
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683694#comment-16683694 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636122 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +There are several optional methods to alter how the PubSubSource is created, the bare minimum is to provide a google project and pubsub subscription and a way to deserialize the PubSubMessages. Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683689#comment-16683689 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232635986 ## File path: flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/pom.xml ## @@ -0,0 +1,149 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-end-to-end-tests + 1.7-SNAPSHOT + .. + + + flink-connector-pubsub-emulator-tests + flink-connector-pubsub-emulator-tests + + jar + + Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683696#comment-16683696 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636183 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +There are several optional methods to alter how the PubSubSource is created, the bare minimum is to provide a google project and pubsub subscription and a way to deserialize the PubSubMessages. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +DeserializationSchema deserializationSchema = (...); +SourceFunction pubsubSource = PubSubSource.newBuilder() Review comment: Yes This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683692#comment-16683692 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636084 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683698#comment-16683698 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636206 ## File path: flink-connectors/flink-connector-pubsub/pom.xml ## @@ -0,0 +1,103 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.7-SNAPSHOT + .. + + + flink-connector-pubsub_${scala.binary.version} + flink-connector-pubsub + + jar + + + + + + com.google.cloud + google-cloud-bom + 0.53.0-alpha + pom + import + + + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + provided + + + + com.google.cloud + google-cloud-pubsub + + + + + com.google.guava + guava-jdk5 + + + provided + + + + org.slf4j + slf4j-api Review comment: Yes, that makes it cleaner. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683691#comment-16683691 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636055 ## File path: flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapperTest.java ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.core.ApiService; +import com.google.api.gax.core.CredentialsProvider; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.apache.flink.api.java.ClosureCleaner.ensureSerializable; +import static org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider.withoutCredentials; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link SubscriberWrapper}. + */ +@RunWith(MockitoJUnitRunner.class) +public class SubscriberWrapperTest { + @Mock + private PubSubSubscriberFactory pubSubSubscriberFactory; + + @Mock + private Subscriber subscriber; + + @Mock + private ApiService apiService; + + private PubsubMessage pubsubMessage = pubSubMessage(); + + @Mock + private AckReplyConsumer ackReplyConsumer; + + private SubscriberWrapper subscriberWrapper; + + @Before + public void setup() throws Exception { + when(pubSubSubscriberFactory.getSubscriber(any(), any(), any())).thenReturn(subscriber); + subscriberWrapper = new SubscriberWrapper(withoutCredentials(), ProjectSubscriptionName.of("projectId", "subscriptionId"), pubSubSubscriberFactory); + } + + @Test + public void testSerializedSubscriberBuilder() throws Exception { + SubscriberWrapper subscriberWrapper = new SubscriberWrapper(withoutCredentials(), ProjectSubscriptionName.of("projectId", "subscriptionId"), SubscriberWrapperTest::subscriberFactory); + ensureSerializable(subscriberWrapper); + } + + @Test + public void testInitialisation() { + SerializableCredentialsProvider credentialsProvider = withoutCredentials(); + ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of("projectId", "subscriptionId"); + SubscriberWrapper subscriberWrapper = new SubscriberWrapper(credentialsProvider, projectSubscriptionName, pubSubSubscriberFactory); + + subscriberWrapper.initialize(); + verify(pubSubSubscriberFactory, times(1)).getSubscriber(credentialsProvider, projectSubscriptionName, subscriberWrapper); + } + + @Test + public void testStart() { + when(subscriber.startAsync()).thenReturn(apiService); + subscriberWrapper.initialize(); + + subscriberWrapper.start(); + verify(apiService, times(1)).awaitRunning(); + assertThat(subscriberWrapper.amountOfMe
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683693#comment-16683693 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636103 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683695#comment-16683695 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636179 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +There are several optional methods to alter how the PubSubSource is created, the bare minimum is to provide a google project and pubsub subscription and a way to deserialize the PubSubMessages. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +DeserializationSchema deserializationSchema = (...); +SourceFunction pubsubSource = PubSubSource.newBuilder() + .withDeserializationSchema(deserializationSchema) + .withProjectSubscriptionName("google-project-name", "pubsub-subscription") + .build(); + +streamExecEnv.addSource(pubsubSource); +{% endhighlight %} + + + + PubSub Sink + +The connector provides a Sink for writing data to PubSub. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +This builder works in a similar way to the PubSubSource. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +SerializationSchema serializationSchema = (...); +SinkFunction pubsubSink = PubSubSink.newBuilder() + .withSerializationSchema(serializationSchema) + .withTopicName("pubsub-topic-name") + .withProjectName("google-project-name") + .build() + +streamExecEnv.addSink(pubsubSink); +{% endhighlight %} + + + + Google Credentials + +Google uses [Credentials](https://cloud.google.com/docs/authentication/production) to authenticate and authorize applications so that they can use Google cloud resources such as PubSub. Both builders allow several ways to provide these credentials. + +By default the connectors will look for an environment variable: [GOOGLE_APPLICATION_CREDENTIALS](https://cloud.google.com/docs/authentication/production#obtaining_and_providing_service_account_credentials_manually) which should point to a file containing the credentials. + +It is also possible to provide a Credentials object directly. For instance if you read the Credentials yourself from an external system. In this case you can use `PubSubSource.newBuilder().withCredentials(...)` Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to pr
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683690#comment-16683690 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636021 ## File path: flink-connectors/flink-connector-pubsub/src/test/resources/log4j-test.properties ## @@ -0,0 +1,24 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, testlogger Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683643#comment-16683643 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232626013 ## File path: flink-examples/flink-examples-streaming/pom.xml ## @@ -62,6 +66,17 @@ under the License. ${project.version} + + org.apache.flink + flink-connector-pubsub_${scala.binary.version} Review comment: I double checked but all the other connectors are also in org.apache.flink This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673065#comment-16673065 ] ASF GitHub Bot commented on FLINK-9311: --- Fokko commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230361009 ## File path: flink-connectors/flink-connector-pubsub/pom.xml ## @@ -0,0 +1,103 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.7-SNAPSHOT + .. + + + flink-connector-pubsub_${scala.binary.version} + flink-connector-pubsub + + jar + + + + + + com.google.cloud + google-cloud-bom + 0.53.0-alpha Review comment: 53 is pretty old already. 69 is the most recent. Is there a specific reason to use this older version? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673064#comment-16673064 ] ASF GitHub Bot commented on FLINK-9311: --- Fokko commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230362058 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +/** + * This class defines a bound based on messages received or time since last received message. + * Using start(SourceFunction) starts the bound. Everytime a message is received the sourceFunction should call receivedMessage(). + * When the bound is reached, the sourcefunction gets closed by calling sourceFunction.close() + * See {@link BoundedPubSubSource}. + * + * @param type of message that is received by the SourceFunction. + */ +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; Review comment: The cancelled variable is set in the class, and the messageReceived in the constructor, maybe choose either one of them? To keep the code in the same style. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673063#comment-16673063 ] ASF GitHub Bot commented on FLINK-9311: --- Fokko commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230362843 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.core.ApiService; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Wrapper class around a PubSub {@link Subscriber}. + * This class makes it easier to connect to a Non Google PubSub service such as a local PubSub emulator or docker container. + */ +class SubscriberWrapper implements Serializable, MessageReceiver { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSource.class); Review comment: Should be SubscriberWrapper.class This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673062#comment-16673062 ] ASF GitHub Bot commented on FLINK-9311: --- Fokko commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230359429 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) Review comment: Missing spaces around the braces This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672740#comment-16672740 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230290838 ## File path: flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapperTest.java ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.core.ApiService; +import com.google.api.gax.core.CredentialsProvider; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.apache.flink.api.java.ClosureCleaner.ensureSerializable; +import static org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider.withoutCredentials; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link SubscriberWrapper}. + */ +@RunWith(MockitoJUnitRunner.class) +public class SubscriberWrapperTest { + @Mock + private PubSubSubscriberFactory pubSubSubscriberFactory; + + @Mock + private Subscriber subscriber; + + @Mock + private ApiService apiService; + + private PubsubMessage pubsubMessage = pubSubMessage(); + + @Mock + private AckReplyConsumer ackReplyConsumer; + + private SubscriberWrapper subscriberWrapper; + + @Before + public void setup() throws Exception { + when(pubSubSubscriberFactory.getSubscriber(any(), any(), any())).thenReturn(subscriber); + subscriberWrapper = new SubscriberWrapper(withoutCredentials(), ProjectSubscriptionName.of("projectId", "subscriptionId"), pubSubSubscriberFactory); + } + + @Test + public void testSerializedSubscriberBuilder() throws Exception { + SubscriberWrapper subscriberWrapper = new SubscriberWrapper(withoutCredentials(), ProjectSubscriptionName.of("projectId", "subscriptionId"), SubscriberWrapperTest::subscriberFactory); + ensureSerializable(subscriberWrapper); + } + + @Test + public void testInitialisation() { + SerializableCredentialsProvider credentialsProvider = withoutCredentials(); + ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of("projectId", "subscriptionId"); + SubscriberWrapper subscriberWrapper = new SubscriberWrapper(credentialsProvider, projectSubscriptionName, pubSubSubscriberFactory); + + subscriberWrapper.initialize(); + verify(pubSubSubscriberFactory, times(1)).getSubscriber(credentialsProvider, projectSubscriptionName, subscriberWrapper); + } + + @Test + public void testStart() { + when(subscriber.startAsync()).thenReturn(apiService); + subscriberWrapper.initialize(); + + subscriberWrapper.start(); + verify(apiService, times(1)).awaitRunning(); + assertThat(subscriberWrapper.amountOfMessage
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672734#comment-16672734 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230292200 ## File path: flink-connectors/flink-connector-pubsub/pom.xml ## @@ -0,0 +1,103 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.7-SNAPSHOT + .. + + + flink-connector-pubsub_${scala.binary.version} + flink-connector-pubsub + + jar + + + + + + com.google.cloud + google-cloud-bom + 0.53.0-alpha + pom + import + + + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + provided + + + + com.google.cloud + google-cloud-pubsub + + + + + com.google.guava + guava-jdk5 + + + provided + + + + org.slf4j + slf4j-api Review comment: you may omit this dependency; all modules have it by default and it's pretty much just noise. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672736#comment-16672736 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230295883 ## File path: flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.Test; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +/** + * Tests for {@link BoundedPubSubSource}. + */ +public class BoundedPubSubSourceTest { + private final Bound bound = mock(Bound.class); + private final SubscriberWrapper subscriberWrapper = mock(SubscriberWrapper.class); + private final SourceFunction.SourceContext sourceContext = mock(SourceFunction.SourceContext.class); + private final AckReplyConsumer ackReplyConsumer = mock(AckReplyConsumer.class); + private final DeserializationSchema deserializationSchema = mock(DeserializationSchema.class); + private final MetricGroup metricGroup = mock(MetricGroup.class); Review comment: It is never necessary to mock `MetricGroup`, instead just use a `UnregisteredMetricsGroup`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672738#comment-16672738 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230294739 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** + * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them as soon as they have been received. + */ +public class PubSubSource extends MultipleIdsMessageAcknowledgingSourceBase + implements ResultTypeQueryable, ParallelSourceFunction, StoppableFunction { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSource.class); + protected DeserializationSchema deserializationSchema; + protected SubscriberWrapper subscriberWrapper; + + protected boolean running = true; + protected transient volatile SourceContext sourceContext = null; + + protected PubSubSource() { + super(String.class); + } + + protected void setDeserializationSchema(DeserializationSchema deserializationSchema) { + this.deserializationSchema = deserializationSchema; + } + + protected void setSubscriberWrapper(SubscriberWrapper subscriberWrapper) { + this.subscriberWrapper = subscriberWrapper; + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + subscriberWrapper.initialize(); + if (hasNoCheckpointingEnabled(getRuntimeContext())) { + throw new IllegalArgumentException("The PubSubSource REQUIRES Checkpointing to be enabled and " + + "the checkpointing frequency must be MUCH lower than the PubSub timeout for it to retry a message."); + } + + getRuntimeContext().getMetricGroup().gauge("PubSubMessagesProcessedNotAcked", this::getOutstandingMessagesToAck); + getRuntimeContext().getMetricGroup().gauge("PubSubMessagesReceivedNotProcessed", subscriberWrapper::amountOfMessagesInBuffer); + } + + private boolean hasNoCheckpointingEnabled(RuntimeContext runtimeContext) { + return !(runtimeContext instanceof StreamingRuntimeContext && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()); + } + + @Override + protected void acknow
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672744#comment-16672744 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230293145 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.java.tuple.Tuple2; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.pubsub.v1.PubsubMessage; + +import java.io.IOException; + +/** + * A bounded PubSub Source, similar to {@link PubSubSource} but this will stop at some point. + * For example after a period of being idle or and after n amount of messages have been received. + */ +public class BoundedPubSubSource extends PubSubSource { Review comment: I don't think this class is necessary. The `PubSubSource` could also have a `Bound` field, which by default is a No-Op. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672743#comment-16672743 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230294923 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/PubSubSubscriberFactory.java ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub.common; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; + +import java.io.Serializable; + +/** + * A factory class to create a {@link Subscriber}. + * This allows for customized Subscribers with for instance tweaked configurations Review comment: missing periot at the end This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672739#comment-16672739 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230292833 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +/** + * This class defines a bound based on messages received or time since last received message. + * Using start(SourceFunction) starts the bound. Everytime a message is received the sourceFunction should call receivedMessage(). + * When the bound is reached, the sourcefunction gets closed by calling sourceFunction.close() + * See {@link BoundedPubSubSource}. + * + * @param type of message that is received by the SourceFunction. + */ +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; + } + + static Bound boundByAmountOfMessages(long maxMessagedReceived) { + return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L); + } + + static Bound boundByTimeSinceLastMessage(long maxTimeBetweenMessages) { + return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages); + } + + static Bound boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) { + return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages); + } + + private TimerTask shutdownPubSubSource() { + return new TimerTask() { + @Override + public void run() { + if (maxTimeBetweenMessagesElapsed()) { + cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source"); + timer.cancel(); + } + } + }; + } + + private synchronized boolean maxTimeBetweenMessagesElapsed() { + return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages; + } + + private synchronized void cancelPubSubSource(String logMessage) { + if (!cancelled) { + cancelled = true; + sourceFunction.cancel(); + LOG.info(logMessage); + } + } + + void start(SourceFunction sourceFunction) { + if (this.sourceFunction != null) { + throw new IllegalStateException("start() already called"); + } + + this.sourceFunction = sourceFunction; + messagesReceived = 0; + + if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) { + lastReceivedMessage = System.currentTimeMillis(); +
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672741#comment-16672741 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230291579 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +There are several optional methods to alter how the PubSubSource is created, the bare minimum is to provide a google project and pubsub subscription and a way to deserialize the PubSubMessages. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +DeserializationSchema deserializationSchema = (...); +SourceFunction pubsubSource = PubSubSource.newBuilder() + .withDeserializationSchema(deserializationSchema) + .withProjectSubscriptionName("google-project-name", "pubsub-subscription") + .build(); + +streamExecEnv.addSource(pubsubSource); +{% endhighlight %} + + + + PubSub Sink + +The connector provides a Sink for writing data to PubSub. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +This builder works in a similar way to the PubSubSource. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +SerializationSchema serializationSchema = (...); +SinkFunction pubsubSink = PubSubSink.newBuilder() + .withSerializationSchema(serializationSchema) + .withTopicName("pubsub-topic-name") + .withProjectName("google-project-name") + .build() + +streamExecEnv.addSink(pubsubSink); +{% endhighlight %} + + + + Google Credentials + +Google uses [Credentials](https://cloud.google.com/docs/authentication/production) to authenticate and authorize applications so that they can use Google cloud resources such as PubSub. Both builders allow several ways to provide these credentials. + +By default the connectors will look for an environment variable: [GOOGLE_APPLICATION_CREDENTIALS](https://cloud.google.com/docs/authentication/production#obtaining_and_providing_service_account_credentials_manually) which should point to a file containing the credentials. + +It is also possible to provide a Credentials object directly. For instance if you read the Credentials yourself from an external system. In this case you can use `PubSubSource.newBuilder().withCredentials(...)` Review comment: -> "For instance, if you [...] external system you can use" missing period at the end of the sentence. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily s
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672742#comment-16672742 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230295359 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub.common; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.auth.Credentials; + +import java.io.IOException; +import java.io.Serializable; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * Wrapper class for CredentialsProvider to make it Serializable. This can be used to pass on Credentials to SourceFunctions + */ +public class SerializableCredentialsProvider implements CredentialsProvider, Serializable { + private final Credentials credentials; Review comment: Unless all `Credentials` implementations happen to be serializable then this wrapper provides little value, as the serialization can still fail. This issue can be circumvented by letting the user pass a factory for creating credentials, similar to what the cassandra sinks do. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672733#comment-16672733 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230293861 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java ## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSink.class); + + private SerializableCredentialsProvider serializableCredentialsProvider; + private SerializationSchema serializationSchema; + private String projectName; + private String topicName; + private String hostAndPort = null; + + private transient Publisher publisher; + + private PubSubSink() { + } + + void setSerializableCredentialsProvider(SerializableCredentialsProvider serializableCredentialsProvider) { + this.serializableCredentialsProvider = serializableCredentialsProvider; + } + + void setSerializationSchema(SerializationSchema serializationSchema) { + this.serializationSchema = serializationSchema; + } + + void setProjectName(String projectName) { + this.projectName = projectName; + } + + void setTopicName(String topicName) { + this.topicName = topicName; + } + + /** +* Set the custom hostname/port combination of PubSub. +* The ONLY reason to use this is during tests with the emulator provided by Google. +* +* @param hostAndPort The combination of hostname and port to connect to ("hostname:1234") +*/ + void withHostAndPort(String hostAndPort) { + this.hostAndPort = hostAndPort; + } + + void initialize() throws IOException { + if (serializableCredentialsProvider == null) { + serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables(); + } + if (serializationSchema == null) { + throw new IllegalArgumentException("The serializationSchema has not been specified."); + } + if (projectName == null) { + throw new IllegalArgumentException("The projectName has not been specified."); + } + if (topicName == null) { + throw new IllegalArgumentException("The topicName has not been specified."); + } + } + + + private transient ManagedChannel managedChannel = null; + private transient TransportChannel c
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672730#comment-16672730 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230291645 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +There are several optional methods to alter how the PubSubSource is created, the bare minimum is to provide a google project and pubsub subscription and a way to deserialize the PubSubMessages. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +DeserializationSchema deserializationSchema = (...); +SourceFunction pubsubSource = PubSubSource.newBuilder() + .withDeserializationSchema(deserializationSchema) + .withProjectSubscriptionName("google-project-name", "pubsub-subscription") + .build(); + +streamExecEnv.addSource(pubsubSource); +{% endhighlight %} + + + + PubSub Sink + +The connector provides a Sink for writing data to PubSub. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +This builder works in a similar way to the PubSubSource. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +SerializationSchema serializationSchema = (...); +SinkFunction pubsubSink = PubSubSink.newBuilder() + .withSerializationSchema(serializationSchema) + .withTopicName("pubsub-topic-name") + .withProjectName("google-project-name") + .build() + +streamExecEnv.addSink(pubsubSink); +{% endhighlight %} + + + + Google Credentials + +Google uses [Credentials](https://cloud.google.com/docs/authentication/production) to authenticate and authorize applications so that they can use Google cloud resources such as PubSub. Both builders allow several ways to provide these credentials. + +By default the connectors will look for an environment variable: [GOOGLE_APPLICATION_CREDENTIALS](https://cloud.google.com/docs/authentication/production#obtaining_and_providing_service_account_credentials_manually) which should point to a file containing the credentials. + +It is also possible to provide a Credentials object directly. For instance if you read the Credentials yourself from an external system. In this case you can use `PubSubSource.newBuilder().withCredentials(...)` + + Integration testing + +When using integration tests you might not want to connect to PubSub directly but use a docker container to read and write to. This is possible by using `PubSubSource.newBuilder().withHostAndPort("localhost:1234")`. Review comment: -> "When running" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would li
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672725#comment-16672725 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230295643 ## File path: flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.junit.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +/** + * Test for {@link Bound}. + */ +public class BoundTest { + private SourceFunction sourceFunction = mock(SourceFunction.class); Review comment: The `SourceFunction` interface only contains 2 methods, please just create a proper implementation. We'd like to reduce the amount of mocking in our tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672721#comment-16672721 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230290509 ## File path: flink-examples/flink-examples-streaming/pom.xml ## @@ -62,6 +66,17 @@ under the License. ${project.version} + + org.apache.flink + flink-connector-pubsub_${scala.binary.version} Review comment: let's keep the examples in the connector package as we do for most other connectors. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672729#comment-16672729 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230294657 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** + * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them as soon as they have been received. Review comment: I'll need a bit of background here. Let's say that the sources consumes a PubSub message, acknowledges it, and the job subsequently fails and restarts. In this case, will the message be consumed again? If not, to my understanding, then the source does not provide at-least-once guarantees. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672727#comment-16672727 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230293352 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/DefaultPubSubSubscriberFactory.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; + +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory { + private String hostAndPort; + + void withHostAndPort(String hostAndPort) { + this.hostAndPort = hostAndPort; + } + + @Override + public Subscriber getSubscriber(CredentialsProvider credentialsProvider, ProjectSubscriptionName projectSubscriptionName, MessageReceiver messageReceiver) { + FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(1L) + .setMaxOutstandingRequestBytes(10L) Review comment: at most 1 empty line should be here This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672737#comment-16672737 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230291173 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +There are several optional methods to alter how the PubSubSource is created, the bare minimum is to provide a google project and pubsub subscription and a way to deserialize the PubSubMessages. Review comment: -> ´provide a google project, pubsub subscription and a way to deserialize the PubSubMessages` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672720#comment-16672720 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230290255 ## File path: flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/resources/log4j-test.properties ## @@ -0,0 +1,24 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, testlogger Review comment: by convention all test loggers should be `OFF` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672726#comment-16672726 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230292232 ## File path: flink-connectors/flink-connector-pubsub/pom.xml ## @@ -0,0 +1,103 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.7-SNAPSHOT + .. + + + flink-connector-pubsub_${scala.binary.version} + flink-connector-pubsub + + jar + + + + + + com.google.cloud + google-cloud-bom + 0.53.0-alpha + pom + import + + + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + provided + + + + com.google.cloud + google-cloud-pubsub + + + + + com.google.guava + guava-jdk5 + + + provided + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + org.slf4j + slf4j-log4j12 Review comment: you may omit this dependency; all modules have it by default and it's pretty much just noise. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672728#comment-16672728 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230291286 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +There are several optional methods to alter how the PubSubSource is created, the bare minimum is to provide a google project and pubsub subscription and a way to deserialize the PubSubMessages. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +DeserializationSchema deserializationSchema = (...); +SourceFunction pubsubSource = PubSubSource.newBuilder() Review comment: Is the generic argument for newBuilder actually necessary? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672735#comment-16672735 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230293662 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/DefaultPubSubSubscriberFactory.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; + +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory { + private String hostAndPort; + + void withHostAndPort(String hostAndPort) { + this.hostAndPort = hostAndPort; + } + + @Override + public Subscriber getSubscriber(CredentialsProvider credentialsProvider, ProjectSubscriptionName projectSubscriptionName, MessageReceiver messageReceiver) { + FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(1L) + .setMaxOutstandingRequestBytes(10L) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .build(); + Subscriber.Builder builder = Subscriber + .newBuilder(ProjectSubscriptionName.of(projectSubscriptionName.getProject(), projectSubscriptionName.getSubscription()), messageReceiver) + .setFlowControlSettings(flowControlSettings) + .setCredentialsProvider(credentialsProvider); + + if (hostAndPort != null) { + ManagedChannel managedChannel = ManagedChannelBuilder + .forTarget(hostAndPort) + .usePlaintext() // This is 'Ok' because this is ONLY used for testing. Review comment: ehhI'd like to avoid branches that are only used in tests in production code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSu
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672732#comment-16672732 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230291060 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. Review comment: second sentence is cut off. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672722#comment-16672722 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230291003 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} Review comment: add space after `{{` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672731#comment-16672731 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230293364 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/DefaultPubSubSubscriberFactory.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; + +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory { + private String hostAndPort; Review comment: can be final This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672723#comment-16672723 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230290793 ## File path: flink-connectors/flink-connector-pubsub/src/test/resources/log4j-test.properties ## @@ -0,0 +1,24 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, testlogger Review comment: set to `OFF` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672724#comment-16672724 ] ASF GitHub Bot commented on FLINK-9311: --- zentol commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r230290661 ## File path: flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/pom.xml ## @@ -0,0 +1,149 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-end-to-end-tests + 1.7-SNAPSHOT + .. + + + flink-connector-pubsub-emulator-tests + flink-connector-pubsub-emulator-tests + + jar + + Review comment: please moves this comment into the dependency itself, i.e. ``` com.google.cloud google-cloud-bom 0.53.0-alpha pom import ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611065#comment-16611065 ] ASF GitHub Bot commented on FLINK-9311: --- sceee commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r216775909 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +There are several optional methods to alter how the PubSubSource is created, the bare minimum is to provide a google project and pubsub subscription and a way to deserialize the PubSubMessages. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +DeserializationSchema deserializationSchema = (...); +SourceFunction pubsubSource = PubSubSource.newBuilder() + .withDeserializationSchema(deserializationSchema) + .withProjectSubscriptionName("google-project-name", "pubsub-subscription") + .build(); + +streamExecEnv.addSource(pubsubSource); +{% endhighlight %} + + + + PubSub Sink + +The connector provides a Sink for writing data to PubSub. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +This builder works in a similar way to the PubSubSource. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +SerializationSchema serializationSchema = (...); +SourceFunction pubsubSink = PubSubSink.newBuilder() Review comment: This should be SinkFunction, right? So `SinkFunction pubsubSink = PubSubSink.newBuilder()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16593118#comment-16593118 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r212852318 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java ## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +import java.io.IOException; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction { + + private SerializableCredentialsProvider serializableCredentialsProvider; + private SerializationSchema serializationSchema; + private String projectName; + private String topicName; + private String hostAndPort = null; + + private transient Publisher publisher; + + private PubSubSink() { Review comment: Sorry, I missed your `PubSubSinkBuilder`, I just saw that you have done a lot of NPE checks on the `initialize` method in this class. If they are mandatory fields, you can pass them directly in the constructor of `PubSubSinkBuilder`. Maybe this can save these checks? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16593115#comment-16593115 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r212851768 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; + } + + static Bound boundByAmountOfMessages(long maxMessagedReceived) { + return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L); + } + + static Bound boundByTimeSinceLastMessage(long maxTimeBetweenMessages) { + return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages); + } + + static Bound boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) { + return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages); + } + + private TimerTask shutdownPubSubSource() { + return new TimerTask() { + @Override + public void run() { + if (maxTimeBetweenMessagesElapsed()) { + cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source"); + timer.cancel(); + } + } + }; + } + + private synchronized boolean maxTimeBetweenMessagesElapsed() { + return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages; + } + + private synchronized void cancelPubSubSource(String logMessage) { + if (!cancelled) { + cancelled = true; + sourceFunction.cancel(); + LOG.info(logMessage); + } + } + + void start(SourceFunction sourceFunction) { + if (this.sourceFunction != null) { + throw new IllegalStateException("start() already called"); + } + + this.sourceFunction = sourceFunction; + messagesReceived = 0; + + if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) { + lastReceivedMessage = System.currentTimeMillis(); + timer = new Timer(); + timer.schedule(shutdownPubSubSource(), 0, 100); + } + } + + synchronized void receivedMessage() { + if (sourceFunction == null) { + throw new IllegalStateException("start() not called"); + } + + lastReceivedMessage = System.currentTimeMillis(); + messagesReceived++; Review comment: yes,
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592832#comment-16592832 ] ASF GitHub Bot commented on FLINK-9311: --- Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r212798502 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; + } + + static Bound boundByAmountOfMessages(long maxMessagedReceived) { + return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L); + } + + static Bound boundByTimeSinceLastMessage(long maxTimeBetweenMessages) { + return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages); + } + + static Bound boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) { + return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages); + } + + private TimerTask shutdownPubSubSource() { + return new TimerTask() { + @Override + public void run() { + if (maxTimeBetweenMessagesElapsed()) { + cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source"); + timer.cancel(); + } + } + }; + } + + private synchronized boolean maxTimeBetweenMessagesElapsed() { + return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages; + } + + private synchronized void cancelPubSubSource(String logMessage) { + if (!cancelled) { + cancelled = true; + sourceFunction.cancel(); + LOG.info(logMessage); + } + } + + void start(SourceFunction sourceFunction) { + if (this.sourceFunction != null) { + throw new IllegalStateException("start() already called"); + } + + this.sourceFunction = sourceFunction; + messagesReceived = 0; + + if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) { + lastReceivedMessage = System.currentTimeMillis(); + timer = new Timer(); + timer.schedule(shutdownPubSubSource(), 0, 100); + } + } + + synchronized void receivedMessage() { + if (sourceFunction == null) { + throw new IllegalStateException("start() not called"); + } + + lastReceivedMessage = System.currentTimeMillis(); + messagesReceived++; Review comment: What inf
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592581#comment-16592581 ] ASF GitHub Bot commented on FLINK-9311: --- Xeli commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#issuecomment-415969449 Thanks you for your review @yanghua ! I made edits to fix most of them, I've posted 2 follow up questions on the points that were not entirely clear to me :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592579#comment-16592579 ] ASF GitHub Bot commented on FLINK-9311: --- Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r212798502 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; + } + + static Bound boundByAmountOfMessages(long maxMessagedReceived) { + return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L); + } + + static Bound boundByTimeSinceLastMessage(long maxTimeBetweenMessages) { + return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages); + } + + static Bound boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) { + return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages); + } + + private TimerTask shutdownPubSubSource() { + return new TimerTask() { + @Override + public void run() { + if (maxTimeBetweenMessagesElapsed()) { + cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source"); + timer.cancel(); + } + } + }; + } + + private synchronized boolean maxTimeBetweenMessagesElapsed() { + return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages; + } + + private synchronized void cancelPubSubSource(String logMessage) { + if (!cancelled) { + cancelled = true; + sourceFunction.cancel(); + LOG.info(logMessage); + } + } + + void start(SourceFunction sourceFunction) { + if (this.sourceFunction != null) { + throw new IllegalStateException("start() already called"); + } + + this.sourceFunction = sourceFunction; + messagesReceived = 0; + + if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) { + lastReceivedMessage = System.currentTimeMillis(); + timer = new Timer(); + timer.schedule(shutdownPubSubSource(), 0, 100); + } + } + + synchronized void receivedMessage() { + if (sourceFunction == null) { + throw new IllegalStateException("start() not called"); + } + + lastReceivedMessage = System.currentTimeMillis(); + messagesReceived++; Review comment: What inf
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592578#comment-16592578 ] ASF GitHub Bot commented on FLINK-9311: --- Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r212798366 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java ## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +import java.io.IOException; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction { + + private SerializableCredentialsProvider serializableCredentialsProvider; + private SerializationSchema serializationSchema; + private String projectName; + private String topicName; + private String hostAndPort = null; + + private transient Publisher publisher; + + private PubSubSink() { Review comment: I don't entirely understand what you mean, there is a builder to add the required fields. Do you mean adding a constructor to construct the object without having to use a builder? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587655#comment-16587655 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664304 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; + } + + static Bound boundByAmountOfMessages(long maxMessagedReceived) { + return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L); + } + + static Bound boundByTimeSinceLastMessage(long maxTimeBetweenMessages) { + return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages); + } + + static Bound boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) { + return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages); + } + + private TimerTask shutdownPubSubSource() { + return new TimerTask() { + @Override + public void run() { + if (maxTimeBetweenMessagesElapsed()) { + cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source"); + timer.cancel(); + } + } + }; + } + + private synchronized boolean maxTimeBetweenMessagesElapsed() { + return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages; + } + + private synchronized void cancelPubSubSource(String logMessage) { + if (!cancelled) { + cancelled = true; + sourceFunction.cancel(); + LOG.info(logMessage); + } + } + + void start(SourceFunction sourceFunction) { + if (this.sourceFunction != null) { + throw new IllegalStateException("start() already called"); + } + + this.sourceFunction = sourceFunction; + messagesReceived = 0; + + if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) { + lastReceivedMessage = System.currentTimeMillis(); + timer = new Timer(); + timer.schedule(shutdownPubSubSource(), 0, 100); + } + } + + synchronized void receivedMessage() { + if (sourceFunction == null) { + throw new IllegalStateException("start() not called"); + } + + lastReceivedMessage = System.currentTimeMillis(); + messagesReceived++; Review comment: Maybe
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587645#comment-16587645 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664202 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.pubsub.v1.PubsubMessage; + +import java.io.IOException; + +/** + * A bounded PubSub Source, similar to {@link PubSubSource} but this will stop at some point. For example after a period of idle or and after n amount of messages have been received. + * + */ +public class BoundedPubSubSource extends PubSubSource { + private Bound bound; + + private BoundedPubSubSource() { + super(); + } + + protected void setBound(Bound bound) { + this.bound = bound; + } + + @Override + public void run(SourceContext sourceContext) { + bound.start(this); + super.run(sourceContext); + } + + @Override + public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { + super.receiveMessage(message, consumer); + bound.receivedMessage(); + } + + /** +* Creates a {@link BoundedPubSubSourceBuilder}. +* @param Type of Object which will be read by the produced {@link BoundedPubSubSource} +*/ + @SuppressWarnings("unchecked") + public static BoundedPubSubSourceBuilder newBuilder() { + return new BoundedPubSubSourceBuilder<>(new BoundedPubSubSource()); + } + + /** +* Builder to create BoundedPubSubSource. +* @param Type of Object which will be read by the BoundedPubSubSource +*/ + @SuppressWarnings("unchecked") + public static class BoundedPubSubSourceBuilder, BUILDER extends BoundedPubSubSourceBuilder> extends PubSubSourceBuilder { Review comment: Too long This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587653#comment-16587653 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664265 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; + } + + static Bound boundByAmountOfMessages(long maxMessagedReceived) { + return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L); + } + + static Bound boundByTimeSinceLastMessage(long maxTimeBetweenMessages) { + return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages); + } + + static Bound boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) { + return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages); + } + + private TimerTask shutdownPubSubSource() { + return new TimerTask() { + @Override + public void run() { + if (maxTimeBetweenMessagesElapsed()) { + cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source"); + timer.cancel(); + } + } + }; + } + + private synchronized boolean maxTimeBetweenMessagesElapsed() { + return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages; + } + + private synchronized void cancelPubSubSource(String logMessage) { + if (!cancelled) { + cancelled = true; + sourceFunction.cancel(); + LOG.info(logMessage); + } + } + + void start(SourceFunction sourceFunction) { + if (this.sourceFunction != null) { + throw new IllegalStateException("start() already called"); + } + + this.sourceFunction = sourceFunction; + messagesReceived = 0; + + if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) { + lastReceivedMessage = System.currentTimeMillis(); + timer = new Timer(); + timer.schedule(shutdownPubSubSource(), 0, 100); + } + } + + synchronized void receivedMessage() { + if (sourceFunction == null) { + throw new IllegalStateException("start() not called"); + } + + lastReceivedMessage = System.currentTimeMillis(); + messagesReceived++; + + if ((mode
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587646#comment-16587646 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211663768 ## File path: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/IntegerSerializer.java ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.pubsub; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; +import java.math.BigInteger; + +class IntegerSerializer implements DeserializationSchema, SerializationSchema { Review comment: add doc This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587650#comment-16587650 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664160 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java ## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +import java.io.IOException; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction { + + private SerializableCredentialsProvider serializableCredentialsProvider; + private SerializationSchema serializationSchema; + private String projectName; + private String topicName; + private String hostAndPort = null; + + private transient Publisher publisher; + + private PubSubSink() { Review comment: some requisite field we can inject with constructor? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587652#comment-16587652 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664027 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.core.ApiService; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +import java.io.Serializable; + +class SubscriberWrapper implements Serializable { Review comment: add doc This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587654#comment-16587654 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664234 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.pubsub.v1.PubsubMessage; + +import java.io.IOException; + +/** + * A bounded PubSub Source, similar to {@link PubSubSource} but this will stop at some point. For example after a period of idle or and after n amount of messages have been received. Review comment: Too long, I suggest break the line. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587644#comment-16587644 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211663809 ## File path: flink-examples/flink-examples-streaming/pom.xml ## @@ -62,6 +62,17 @@ under the License. ${project.version} + + org.apache.flink + flink-connector-pubsub_${scala.binary.version} + ${project.version} + + + com.google.cloud + google-cloud-pubsub + 1.31.0 Review comment: use property to define version number looks better This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587647#comment-16587647 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664117 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java ## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +import java.io.IOException; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction { + + private SerializableCredentialsProvider serializableCredentialsProvider; + private SerializationSchema serializationSchema; + private String projectName; + private String topicName; + private String hostAndPort = null; + + private transient Publisher publisher; + + private PubSubSink() { + } + + void setSerializableCredentialsProvider(SerializableCredentialsProvider serializableCredentialsProvider) { + this.serializableCredentialsProvider = serializableCredentialsProvider; + } + + void setSerializationSchema(SerializationSchema serializationSchema) { + this.serializationSchema = serializationSchema; + } + + void setProjectName(String projectName) { + this.projectName = projectName; + } + + void setTopicName(String topicName) { + this.topicName = topicName; + } + + /** +* Set the custom hostname/port combination of PubSub. +* The ONLY reason to use this is during tests with the emulator provided by Google. +* +* @param hostAndPort The combination of hostname and port to connect to ("hostname:1234") +*/ + void withHostAndPort(String hostAndPort) { + this.hostAndPort = hostAndPort; + } + + void initialize() throws IOException { + if (serializableCredentialsProvider == null) { + serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables(); + } + if (serializationSchema == null) { + throw new IllegalArgumentException("The serializationSchema has not been specified."); + } + if (projectName == null) { + throw new IllegalArgumentException("The projectName has not been specified."); + } + if (topicName == null) { + throw new IllegalArgumentException("The topicName has not been specified."); + } + } + + + private transient ManagedChannel managedChannel = null; + private transient TransportChannel channel = null; + + @Override + public void open(Configuration configuration) throws Exception { + Publisher.Builder builder = Publisher +
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587648#comment-16587648 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664341 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +class Bound implements Serializable { Review comment: We'd better add a doc for the class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587643#comment-16587643 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211663706 ## File path: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubPublisher.java ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.pubsub; + +import com.google.api.core.ApiFuture; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; + +import java.math.BigInteger; + +class PubSubPublisher { Review comment: add doc This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587649#comment-16587649 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664134 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java ## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +import java.io.IOException; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction { + + private SerializableCredentialsProvider serializableCredentialsProvider; + private SerializationSchema serializationSchema; + private String projectName; + private String topicName; + private String hostAndPort = null; + + private transient Publisher publisher; + + private PubSubSink() { + } + + void setSerializableCredentialsProvider(SerializableCredentialsProvider serializableCredentialsProvider) { + this.serializableCredentialsProvider = serializableCredentialsProvider; + } + + void setSerializationSchema(SerializationSchema serializationSchema) { + this.serializationSchema = serializationSchema; + } + + void setProjectName(String projectName) { + this.projectName = projectName; + } + + void setTopicName(String topicName) { + this.topicName = topicName; + } + + /** +* Set the custom hostname/port combination of PubSub. +* The ONLY reason to use this is during tests with the emulator provided by Google. +* +* @param hostAndPort The combination of hostname and port to connect to ("hostname:1234") +*/ + void withHostAndPort(String hostAndPort) { + this.hostAndPort = hostAndPort; + } + + void initialize() throws IOException { + if (serializableCredentialsProvider == null) { + serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables(); + } + if (serializationSchema == null) { + throw new IllegalArgumentException("The serializationSchema has not been specified."); + } + if (projectName == null) { + throw new IllegalArgumentException("The projectName has not been specified."); + } + if (topicName == null) { + throw new IllegalArgumentException("The topicName has not been specified."); + } + } + + + private transient ManagedChannel managedChannel = null; + private transient TransportChannel channel = null; + + @Override + public void open(Configuration configuration) throws Exception { + Publisher.Builder builder = Publisher +
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587651#comment-16587651 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664074 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; + +import java.io.IOException; +import java.util.List; + + +/** + * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them as soon as they have been received. + */ +public class PubSubSource extends MultipleIdsMessageAcknowledgingSourceBase implements MessageReceiver, ResultTypeQueryable, ParallelSourceFunction { Review comment: too long This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587208#comment-16587208 ] ASF GitHub Bot commented on FLINK-9311: --- Xeli opened a new pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594 **What is the purpose of the change** Adding a PubSub connector with support for Checkpointing **Verifying this change** This change added tests and can be verified as follows: - Added unit tests. - Added integration tests to flink-end-to-end which runs against docker. - An example has been added in flink-examples which runs against the actual Google PubSub service. this has been manually verified. - Is there a need for integration tests? We feel like there is and have added them. **Does this pull request potentially affect one of the following parts:** - Dependencies (does it add or upgrade a dependency): Yes, Google Cloud Sdk for PubSub but because it is a connector this does not add any dependencies in flink itself. - The public API, i.e., is any changed class annotated with @Public(Evolving): No - The serializers: No - The runtime per-record code paths (performance sensitive): Nothing has been changed only a connector has been added. - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, - Yarn/Mesos, ZooKeeper: No - The S3 file system connector: No **Documentation** Does this pull request introduce a new feature? Yes If yes, how is the feature documented? JavaDocs, added an example in flink-examples and we updated the website docs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16547551#comment-16547551 ] Niels Basjes commented on FLINK-9311: - FYI: Richard Deurwaarder (Xeli) is a colleague of mine at bol.com and we are working together to create this feature (we need it for a project we are doing). We'll put up a new pull request when we think it works as we want it to. > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16547498#comment-16547498 ] ASF GitHub Bot commented on FLINK-9311: --- Github user Xeli commented on the issue: https://github.com/apache/flink/pull/6248 I will close this PR for now because we'd like to add some more features such as a PubSubSink and some integrations tests > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16547499#comment-16547499 ] ASF GitHub Bot commented on FLINK-9311: --- Github user Xeli closed the pull request at: https://github.com/apache/flink/pull/6248 > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532393#comment-16532393 ] ASF GitHub Bot commented on FLINK-9311: --- Github user Xeli commented on the issue: https://github.com/apache/flink/pull/6248 I've had to make a small tweak to the code ( I rebased it because I don't think anyone looked at this yet) > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531854#comment-16531854 ] ASF GitHub Bot commented on FLINK-9311: --- GitHub user Xeli opened a pull request: https://github.com/apache/flink/pull/6248 [FLINK-9311] [pubsub] Added PubSub connector with support for checkpointing ## What is the purpose of the change Adding a PubSub connector with support for Checkpointing ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests* - *Manually verified the connector (without Checkpointing) on an actual PubSub topic and subscription.* **Is there a need for integration tests? I did not see any for the other connectors. What is a good way of testing the checkpointing / exactly-once behavior?** ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes, Google Cloud Sdk for PubSub (**Does this need to be shaded?**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): don't know, don't think so - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes, checkpointing - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/Xeli/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6248.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6248 commit dddbe671a0d663045110b89ad9bb85ce9a7e7051 Author: Richard Deurwaarder Date: 2018-05-26T12:59:32Z [FLINK-9311] [pubsub] Add PubSubSource without checkpointing commit 30fab0fd6810691f22ff583ce3f942e247d9fe45 Author: Richard Deurwaarder Date: 2018-07-03T17:34:02Z [FLINK-9311] [pubsub] Add checkpointing to PubSubSource > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16471908#comment-16471908 ] Stephan Ewen commented on FLINK-9311: - Please take a look at the {{MessageAcknowledgingSourceBase}} and the way the {{RMQSource}} uses that. > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Priority: Minor > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16466409#comment-16466409 ] Richard Deurwaarder commented on FLINK-9311: PubSub works more like RabbitMQ than Kafka using ack/nack. It has atleast-once-delivery, but using a [message id|http://googleapis.github.io/googleapis/java/all/latest/apidocs/com/google/pubsub/v1/PubsubMessageOrBuilder.html#getMessageId--] exactly-once can be achived. Is there an existing connector using message id's for deduplication? > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Priority: Minor > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16466356#comment-16466356 ] Stephan Ewen commented on FLINK-9311: - The RichParallelSourceFunction is a good place to start. What is the exactly-once model behind PubSub? Is it message ack/commit, like for example in RabbitMQ, or is it sequence-number/offset commits, like in Kafka/Kinesis? > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Priority: Minor > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)