[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r212852318 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java ## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +import java.io.IOException; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction { + + private SerializableCredentialsProvider serializableCredentialsProvider; + private SerializationSchema serializationSchema; + private String projectName; + private String topicName; + private String hostAndPort = null; + + private transient Publisher publisher; + + private PubSubSink() { Review comment: Sorry, I missed your `PubSubSinkBuilder`, I just saw that you have done a lot of NPE checks on the `initialize` method in this class. If they are mandatory fields, you can pass them directly in the constructor of `PubSubSinkBuilder`. Maybe this can save these checks? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r212851768 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; + } + + static Bound boundByAmountOfMessages(long maxMessagedReceived) { + return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L); + } + + static Bound boundByTimeSinceLastMessage(long maxTimeBetweenMessages) { + return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages); + } + + static Bound boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) { + return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages); + } + + private TimerTask shutdownPubSubSource() { + return new TimerTask() { + @Override + public void run() { + if (maxTimeBetweenMessagesElapsed()) { + cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source"); + timer.cancel(); + } + } + }; + } + + private synchronized boolean maxTimeBetweenMessagesElapsed() { + return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages; + } + + private synchronized void cancelPubSubSource(String logMessage) { + if (!cancelled) { + cancelled = true; + sourceFunction.cancel(); + LOG.info(logMessage); + } + } + + void start(SourceFunction sourceFunction) { + if (this.sourceFunction != null) { + throw new IllegalStateException("start() already called"); + } + + this.sourceFunction = sourceFunction; + messagesReceived = 0; + + if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) { + lastReceivedMessage = System.currentTimeMillis(); + timer = new Timer(); + timer.schedule(shutdownPubSubSource(), 0, 100); + } + } + + synchronized void receivedMessage() { + if (sourceFunction == null) { + throw new IllegalStateException("start() not called"); + } + + lastReceivedMessage = System.currentTimeMillis(); + messagesReceived++; Review comment: yes, I mean to expose the number of messages received as metrics, just an idea, not sure if it is a good suggestion. This is an automated message from the Apache Git Service. To respond to the me
[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664265 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; + } + + static Bound boundByAmountOfMessages(long maxMessagedReceived) { + return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L); + } + + static Bound boundByTimeSinceLastMessage(long maxTimeBetweenMessages) { + return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages); + } + + static Bound boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) { + return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages); + } + + private TimerTask shutdownPubSubSource() { + return new TimerTask() { + @Override + public void run() { + if (maxTimeBetweenMessagesElapsed()) { + cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source"); + timer.cancel(); + } + } + }; + } + + private synchronized boolean maxTimeBetweenMessagesElapsed() { + return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages; + } + + private synchronized void cancelPubSubSource(String logMessage) { + if (!cancelled) { + cancelled = true; + sourceFunction.cancel(); + LOG.info(logMessage); + } + } + + void start(SourceFunction sourceFunction) { + if (this.sourceFunction != null) { + throw new IllegalStateException("start() already called"); + } + + this.sourceFunction = sourceFunction; + messagesReceived = 0; + + if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) { + lastReceivedMessage = System.currentTimeMillis(); + timer = new Timer(); + timer.schedule(shutdownPubSubSource(), 0, 100); + } + } + + synchronized void receivedMessage() { + if (sourceFunction == null) { + throw new IllegalStateException("start() not called"); + } + + lastReceivedMessage = System.currentTimeMillis(); + messagesReceived++; + + if ((mode == Mode.COUNTER || mode == Mode.COUNTER_OR_TIMER) && messagesReceived >= maxMessagedReceived) { + cancelPubSubSource("BoundedSourceFunction: Max received messages --> canceling source"); + } + } + + private
[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664074 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; + +import java.io.IOException; +import java.util.List; + + +/** + * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them as soon as they have been received. + */ +public class PubSubSource extends MultipleIdsMessageAcknowledgingSourceBase implements MessageReceiver, ResultTypeQueryable, ParallelSourceFunction { Review comment: too long This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664117 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java ## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +import java.io.IOException; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction { + + private SerializableCredentialsProvider serializableCredentialsProvider; + private SerializationSchema serializationSchema; + private String projectName; + private String topicName; + private String hostAndPort = null; + + private transient Publisher publisher; + + private PubSubSink() { + } + + void setSerializableCredentialsProvider(SerializableCredentialsProvider serializableCredentialsProvider) { + this.serializableCredentialsProvider = serializableCredentialsProvider; + } + + void setSerializationSchema(SerializationSchema serializationSchema) { + this.serializationSchema = serializationSchema; + } + + void setProjectName(String projectName) { + this.projectName = projectName; + } + + void setTopicName(String topicName) { + this.topicName = topicName; + } + + /** +* Set the custom hostname/port combination of PubSub. +* The ONLY reason to use this is during tests with the emulator provided by Google. +* +* @param hostAndPort The combination of hostname and port to connect to ("hostname:1234") +*/ + void withHostAndPort(String hostAndPort) { + this.hostAndPort = hostAndPort; + } + + void initialize() throws IOException { + if (serializableCredentialsProvider == null) { + serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables(); + } + if (serializationSchema == null) { + throw new IllegalArgumentException("The serializationSchema has not been specified."); + } + if (projectName == null) { + throw new IllegalArgumentException("The projectName has not been specified."); + } + if (topicName == null) { + throw new IllegalArgumentException("The topicName has not been specified."); + } + } + + + private transient ManagedChannel managedChannel = null; + private transient TransportChannel channel = null; + + @Override + public void open(Configuration configuration) throws Exception { + Publisher.Builder builder = Publisher + .newBuilder(ProjectTopicName.of(projectName, topicName)) + .setCredentialsProvider(serializableCredentialsProvider); + + if (hostAndPort != null) { + managedChannel = ManagedChannelBuilder +
[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211663706 ## File path: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubPublisher.java ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.pubsub; + +import com.google.api.core.ApiFuture; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; + +import java.math.BigInteger; + +class PubSubPublisher { Review comment: add doc This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664027 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.core.ApiService; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +import java.io.Serializable; + +class SubscriberWrapper implements Serializable { Review comment: add doc This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211663809 ## File path: flink-examples/flink-examples-streaming/pom.xml ## @@ -62,6 +62,17 @@ under the License. ${project.version} + + org.apache.flink + flink-connector-pubsub_${scala.binary.version} + ${project.version} + + + com.google.cloud + google-cloud-pubsub + 1.31.0 Review comment: use property to define version number looks better This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664134 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java ## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +import java.io.IOException; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction { + + private SerializableCredentialsProvider serializableCredentialsProvider; + private SerializationSchema serializationSchema; + private String projectName; + private String topicName; + private String hostAndPort = null; + + private transient Publisher publisher; + + private PubSubSink() { + } + + void setSerializableCredentialsProvider(SerializableCredentialsProvider serializableCredentialsProvider) { + this.serializableCredentialsProvider = serializableCredentialsProvider; + } + + void setSerializationSchema(SerializationSchema serializationSchema) { + this.serializationSchema = serializationSchema; + } + + void setProjectName(String projectName) { + this.projectName = projectName; + } + + void setTopicName(String topicName) { + this.topicName = topicName; + } + + /** +* Set the custom hostname/port combination of PubSub. +* The ONLY reason to use this is during tests with the emulator provided by Google. +* +* @param hostAndPort The combination of hostname and port to connect to ("hostname:1234") +*/ + void withHostAndPort(String hostAndPort) { + this.hostAndPort = hostAndPort; + } + + void initialize() throws IOException { + if (serializableCredentialsProvider == null) { + serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables(); + } + if (serializationSchema == null) { + throw new IllegalArgumentException("The serializationSchema has not been specified."); + } + if (projectName == null) { + throw new IllegalArgumentException("The projectName has not been specified."); + } + if (topicName == null) { + throw new IllegalArgumentException("The topicName has not been specified."); + } + } + + + private transient ManagedChannel managedChannel = null; + private transient TransportChannel channel = null; + + @Override + public void open(Configuration configuration) throws Exception { + Publisher.Builder builder = Publisher + .newBuilder(ProjectTopicName.of(projectName, topicName)) + .setCredentialsProvider(serializableCredentialsProvider); + + if (hostAndPort != null) { + managedChannel = ManagedChannelBuilder +
[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211663768 ## File path: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/IntegerSerializer.java ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.pubsub; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; +import java.math.BigInteger; + +class IntegerSerializer implements DeserializationSchema, SerializationSchema { Review comment: add doc This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664160 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java ## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +import java.io.IOException; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction { + + private SerializableCredentialsProvider serializableCredentialsProvider; + private SerializationSchema serializationSchema; + private String projectName; + private String topicName; + private String hostAndPort = null; + + private transient Publisher publisher; + + private PubSubSink() { Review comment: some requisite field we can inject with constructor? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664202 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.pubsub.v1.PubsubMessage; + +import java.io.IOException; + +/** + * A bounded PubSub Source, similar to {@link PubSubSource} but this will stop at some point. For example after a period of idle or and after n amount of messages have been received. + * + */ +public class BoundedPubSubSource extends PubSubSource { + private Bound bound; + + private BoundedPubSubSource() { + super(); + } + + protected void setBound(Bound bound) { + this.bound = bound; + } + + @Override + public void run(SourceContext sourceContext) { + bound.start(this); + super.run(sourceContext); + } + + @Override + public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { + super.receiveMessage(message, consumer); + bound.receivedMessage(); + } + + /** +* Creates a {@link BoundedPubSubSourceBuilder}. +* @param Type of Object which will be read by the produced {@link BoundedPubSubSource} +*/ + @SuppressWarnings("unchecked") + public static BoundedPubSubSourceBuilder newBuilder() { + return new BoundedPubSubSourceBuilder<>(new BoundedPubSubSource()); + } + + /** +* Builder to create BoundedPubSubSource. +* @param Type of Object which will be read by the BoundedPubSubSource +*/ + @SuppressWarnings("unchecked") + public static class BoundedPubSubSourceBuilder, BUILDER extends BoundedPubSubSourceBuilder> extends PubSubSourceBuilder { Review comment: Too long This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664341 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +class Bound implements Serializable { Review comment: We'd better add a doc for the class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664304 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; + } + + static Bound boundByAmountOfMessages(long maxMessagedReceived) { + return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L); + } + + static Bound boundByTimeSinceLastMessage(long maxTimeBetweenMessages) { + return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages); + } + + static Bound boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) { + return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages); + } + + private TimerTask shutdownPubSubSource() { + return new TimerTask() { + @Override + public void run() { + if (maxTimeBetweenMessagesElapsed()) { + cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source"); + timer.cancel(); + } + } + }; + } + + private synchronized boolean maxTimeBetweenMessagesElapsed() { + return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages; + } + + private synchronized void cancelPubSubSource(String logMessage) { + if (!cancelled) { + cancelled = true; + sourceFunction.cancel(); + LOG.info(logMessage); + } + } + + void start(SourceFunction sourceFunction) { + if (this.sourceFunction != null) { + throw new IllegalStateException("start() already called"); + } + + this.sourceFunction = sourceFunction; + messagesReceived = 0; + + if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) { + lastReceivedMessage = System.currentTimeMillis(); + timer = new Timer(); + timer.schedule(shutdownPubSubSource(), 0, 100); + } + } + + synchronized void receivedMessage() { + if (sourceFunction == null) { + throw new IllegalStateException("start() not called"); + } + + lastReceivedMessage = System.currentTimeMillis(); + messagesReceived++; Review comment: Maybe we can report some information as metrics? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specif
[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r211664234 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.pubsub.v1.PubsubMessage; + +import java.io.IOException; + +/** + * A bounded PubSub Source, similar to {@link PubSubSource} but this will stop at some point. For example after a period of idle or and after n amount of messages have been received. Review comment: Too long, I suggest break the line. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services