[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-26 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r212852318
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
 ##
 @@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import 
org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+import java.io.IOException;
+
+/**
+ * A sink function that outputs to PubSub.
+ *
+ * @param  type of PubSubSink messages to write
+ */
+public class PubSubSink extends RichSinkFunction {
+
+   private SerializableCredentialsProvider serializableCredentialsProvider;
+   private SerializationSchema serializationSchema;
+   private String projectName;
+   private String topicName;
+   private String hostAndPort = null;
+
+   private transient Publisher publisher;
+
+   private PubSubSink() {
 
 Review comment:
   Sorry, I missed your `PubSubSinkBuilder`, I just saw that you have done a 
lot of NPE checks on the `initialize` method in this class. If they are 
mandatory fields, you can pass them directly in the constructor of 
`PubSubSinkBuilder`. Maybe this can save these checks?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-26 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r212851768
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Timer;
+import java.util.TimerTask;
+
+class Bound implements Serializable {
+   private static final Logger LOG = LoggerFactory.getLogger(Bound.class);
+
+   private final Bound.Mode mode;
+   private final long maxMessagedReceived;
+   private final long maxTimeBetweenMessages;
+
+   private SourceFunction sourceFunction;
+   private transient Timer timer;
+   private long messagesReceived;
+   private long lastReceivedMessage;
+   private boolean cancelled = false;
+
+   private Bound(Bound.Mode mode, long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   this.mode = mode;
+   this.maxMessagedReceived = maxMessagedReceived;
+   this.maxTimeBetweenMessages = maxTimeBetweenMessages;
+   this.messagesReceived = 0L;
+   }
+
+   static  Bound boundByAmountOfMessages(long 
maxMessagedReceived) {
+   return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L);
+   }
+
+   static  Bound boundByTimeSinceLastMessage(long 
maxTimeBetweenMessages) {
+   return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages);
+   }
+
+   static  Bound 
boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, 
maxTimeBetweenMessages);
+   }
+
+   private TimerTask shutdownPubSubSource() {
+   return new TimerTask() {
+   @Override
+   public void run() {
+   if (maxTimeBetweenMessagesElapsed()) {
+   
cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source");
+   timer.cancel();
+   }
+   }
+   };
+   }
+
+   private synchronized boolean maxTimeBetweenMessagesElapsed() {
+   return System.currentTimeMillis() - lastReceivedMessage > 
maxTimeBetweenMessages;
+   }
+
+   private synchronized void cancelPubSubSource(String logMessage) {
+   if (!cancelled) {
+   cancelled = true;
+   sourceFunction.cancel();
+   LOG.info(logMessage);
+   }
+   }
+
+   void start(SourceFunction sourceFunction) {
+   if (this.sourceFunction != null) {
+   throw new IllegalStateException("start() already 
called");
+   }
+
+   this.sourceFunction = sourceFunction;
+   messagesReceived = 0;
+
+   if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) {
+   lastReceivedMessage = System.currentTimeMillis();
+   timer = new Timer();
+   timer.schedule(shutdownPubSubSource(), 0, 100);
+   }
+   }
+
+   synchronized void receivedMessage() {
+   if (sourceFunction == null) {
+   throw new IllegalStateException("start() not called");
+   }
+
+   lastReceivedMessage = System.currentTimeMillis();
+   messagesReceived++;
 
 Review comment:
   yes, I mean to expose the number of messages received as metrics, just an 
idea, not sure if it is a good suggestion.


This is an automated message from the Apache Git Service.
To respond to the me

[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-21 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r211664265
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Timer;
+import java.util.TimerTask;
+
+class Bound implements Serializable {
+   private static final Logger LOG = LoggerFactory.getLogger(Bound.class);
+
+   private final Bound.Mode mode;
+   private final long maxMessagedReceived;
+   private final long maxTimeBetweenMessages;
+
+   private SourceFunction sourceFunction;
+   private transient Timer timer;
+   private long messagesReceived;
+   private long lastReceivedMessage;
+   private boolean cancelled = false;
+
+   private Bound(Bound.Mode mode, long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   this.mode = mode;
+   this.maxMessagedReceived = maxMessagedReceived;
+   this.maxTimeBetweenMessages = maxTimeBetweenMessages;
+   this.messagesReceived = 0L;
+   }
+
+   static  Bound boundByAmountOfMessages(long 
maxMessagedReceived) {
+   return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L);
+   }
+
+   static  Bound boundByTimeSinceLastMessage(long 
maxTimeBetweenMessages) {
+   return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages);
+   }
+
+   static  Bound 
boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, 
maxTimeBetweenMessages);
+   }
+
+   private TimerTask shutdownPubSubSource() {
+   return new TimerTask() {
+   @Override
+   public void run() {
+   if (maxTimeBetweenMessagesElapsed()) {
+   
cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source");
+   timer.cancel();
+   }
+   }
+   };
+   }
+
+   private synchronized boolean maxTimeBetweenMessagesElapsed() {
+   return System.currentTimeMillis() - lastReceivedMessage > 
maxTimeBetweenMessages;
+   }
+
+   private synchronized void cancelPubSubSource(String logMessage) {
+   if (!cancelled) {
+   cancelled = true;
+   sourceFunction.cancel();
+   LOG.info(logMessage);
+   }
+   }
+
+   void start(SourceFunction sourceFunction) {
+   if (this.sourceFunction != null) {
+   throw new IllegalStateException("start() already 
called");
+   }
+
+   this.sourceFunction = sourceFunction;
+   messagesReceived = 0;
+
+   if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) {
+   lastReceivedMessage = System.currentTimeMillis();
+   timer = new Timer();
+   timer.schedule(shutdownPubSubSource(), 0, 100);
+   }
+   }
+
+   synchronized void receivedMessage() {
+   if (sourceFunction == null) {
+   throw new IllegalStateException("start() not called");
+   }
+
+   lastReceivedMessage = System.currentTimeMillis();
+   messagesReceived++;
+
+   if ((mode == Mode.COUNTER || mode == Mode.COUNTER_OR_TIMER) && 
messagesReceived >= maxMessagedReceived) {
+   cancelPubSubSource("BoundedSourceFunction: Max received 
messages --> canceling source");
+   }
+   }
+
+   private

[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-21 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r211664074
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java
 ##
 @@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/**
+ * PubSub Source, this Source will consume PubSub messages from a subscription 
and Acknowledge them as soon as they have been received.
+ */
+public class PubSubSource extends 
MultipleIdsMessageAcknowledgingSourceBase 
implements MessageReceiver, ResultTypeQueryable, 
ParallelSourceFunction {
 
 Review comment:
   too long


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-21 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r211664117
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
 ##
 @@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import 
org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+import java.io.IOException;
+
+/**
+ * A sink function that outputs to PubSub.
+ *
+ * @param  type of PubSubSink messages to write
+ */
+public class PubSubSink extends RichSinkFunction {
+
+   private SerializableCredentialsProvider serializableCredentialsProvider;
+   private SerializationSchema serializationSchema;
+   private String projectName;
+   private String topicName;
+   private String hostAndPort = null;
+
+   private transient Publisher publisher;
+
+   private PubSubSink() {
+   }
+
+   void setSerializableCredentialsProvider(SerializableCredentialsProvider 
serializableCredentialsProvider) {
+   this.serializableCredentialsProvider = 
serializableCredentialsProvider;
+   }
+
+   void setSerializationSchema(SerializationSchema 
serializationSchema) {
+   this.serializationSchema = serializationSchema;
+   }
+
+   void setProjectName(String projectName) {
+   this.projectName = projectName;
+   }
+
+   void setTopicName(String topicName) {
+   this.topicName = topicName;
+   }
+
+   /**
+* Set the custom hostname/port combination of PubSub.
+* The ONLY reason to use this is during tests with the emulator 
provided by Google.
+*
+* @param hostAndPort The combination of hostname and port to connect 
to ("hostname:1234")
+*/
+   void withHostAndPort(String hostAndPort) {
+   this.hostAndPort = hostAndPort;
+   }
+
+   void initialize() throws IOException {
+   if (serializableCredentialsProvider == null) {
+   serializableCredentialsProvider = 
SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables();
+   }
+   if (serializationSchema == null) {
+   throw new IllegalArgumentException("The 
serializationSchema has not been specified.");
+   }
+   if (projectName == null) {
+   throw new IllegalArgumentException("The projectName has 
not been specified.");
+   }
+   if (topicName == null) {
+   throw new IllegalArgumentException("The topicName has 
not been specified.");
+   }
+   }
+
+
+   private transient ManagedChannel managedChannel = null;
+   private transient TransportChannel channel = null;
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   Publisher.Builder builder = Publisher
+   .newBuilder(ProjectTopicName.of(projectName, topicName))
+   
.setCredentialsProvider(serializableCredentialsProvider);
+
+   if (hostAndPort != null) {
+   managedChannel = ManagedChannelBuilder
+ 

[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-21 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r211663706
 
 

 ##
 File path: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubPublisher.java
 ##
 @@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.pubsub;
+
+import com.google.api.core.ApiFuture;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.math.BigInteger;
+
+class PubSubPublisher {
 
 Review comment:
   add doc


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-21 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r211664027
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import 
org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.core.ApiService;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+import java.io.Serializable;
+
+class SubscriberWrapper implements Serializable {
 
 Review comment:
   add doc


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-21 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r211663809
 
 

 ##
 File path: flink-examples/flink-examples-streaming/pom.xml
 ##
 @@ -62,6 +62,17 @@ under the License.
${project.version}

 
+   
+   org.apache.flink
+   
flink-connector-pubsub_${scala.binary.version}
+   ${project.version}
+   
+   
+   com.google.cloud
+   google-cloud-pubsub
+   1.31.0
 
 Review comment:
   use property to define version number looks better


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-21 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r211664134
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
 ##
 @@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import 
org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+import java.io.IOException;
+
+/**
+ * A sink function that outputs to PubSub.
+ *
+ * @param  type of PubSubSink messages to write
+ */
+public class PubSubSink extends RichSinkFunction {
+
+   private SerializableCredentialsProvider serializableCredentialsProvider;
+   private SerializationSchema serializationSchema;
+   private String projectName;
+   private String topicName;
+   private String hostAndPort = null;
+
+   private transient Publisher publisher;
+
+   private PubSubSink() {
+   }
+
+   void setSerializableCredentialsProvider(SerializableCredentialsProvider 
serializableCredentialsProvider) {
+   this.serializableCredentialsProvider = 
serializableCredentialsProvider;
+   }
+
+   void setSerializationSchema(SerializationSchema 
serializationSchema) {
+   this.serializationSchema = serializationSchema;
+   }
+
+   void setProjectName(String projectName) {
+   this.projectName = projectName;
+   }
+
+   void setTopicName(String topicName) {
+   this.topicName = topicName;
+   }
+
+   /**
+* Set the custom hostname/port combination of PubSub.
+* The ONLY reason to use this is during tests with the emulator 
provided by Google.
+*
+* @param hostAndPort The combination of hostname and port to connect 
to ("hostname:1234")
+*/
+   void withHostAndPort(String hostAndPort) {
+   this.hostAndPort = hostAndPort;
+   }
+
+   void initialize() throws IOException {
+   if (serializableCredentialsProvider == null) {
+   serializableCredentialsProvider = 
SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables();
+   }
+   if (serializationSchema == null) {
+   throw new IllegalArgumentException("The 
serializationSchema has not been specified.");
+   }
+   if (projectName == null) {
+   throw new IllegalArgumentException("The projectName has 
not been specified.");
+   }
+   if (topicName == null) {
+   throw new IllegalArgumentException("The topicName has 
not been specified.");
+   }
+   }
+
+
+   private transient ManagedChannel managedChannel = null;
+   private transient TransportChannel channel = null;
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   Publisher.Builder builder = Publisher
+   .newBuilder(ProjectTopicName.of(projectName, topicName))
+   
.setCredentialsProvider(serializableCredentialsProvider);
+
+   if (hostAndPort != null) {
+   managedChannel = ManagedChannelBuilder
+ 

[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-21 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r211663768
 
 

 ##
 File path: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/IntegerSerializer.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.pubsub;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+import java.math.BigInteger;
+
+class IntegerSerializer implements DeserializationSchema, 
SerializationSchema {
 
 Review comment:
   add doc


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-21 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r211664160
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
 ##
 @@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import 
org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+import java.io.IOException;
+
+/**
+ * A sink function that outputs to PubSub.
+ *
+ * @param  type of PubSubSink messages to write
+ */
+public class PubSubSink extends RichSinkFunction {
+
+   private SerializableCredentialsProvider serializableCredentialsProvider;
+   private SerializationSchema serializationSchema;
+   private String projectName;
+   private String topicName;
+   private String hostAndPort = null;
+
+   private transient Publisher publisher;
+
+   private PubSubSink() {
 
 Review comment:
   some requisite field we can inject with constructor?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-21 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r211664202
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.io.IOException;
+
+/**
+ * A bounded PubSub Source, similar to {@link PubSubSource} but this will stop 
at some point. For example after a period of idle or and after n amount of 
messages have been received.
+ *
+ */
+public class BoundedPubSubSource extends PubSubSource {
+   private Bound bound;
+
+   private BoundedPubSubSource() {
+   super();
+   }
+
+   protected void setBound(Bound bound) {
+   this.bound = bound;
+   }
+
+   @Override
+   public void run(SourceContext sourceContext) {
+   bound.start(this);
+   super.run(sourceContext);
+   }
+
+   @Override
+   public void receiveMessage(PubsubMessage message, AckReplyConsumer 
consumer) {
+   super.receiveMessage(message, consumer);
+   bound.receivedMessage();
+   }
+
+   /**
+* Creates a {@link BoundedPubSubSourceBuilder}.
+* @param  Type of Object which will be read by the produced 
{@link BoundedPubSubSource}
+*/
+   @SuppressWarnings("unchecked")
+   public static  BoundedPubSubSourceBuilder newBuilder() {
+   return new BoundedPubSubSourceBuilder<>(new 
BoundedPubSubSource());
+   }
+
+   /**
+* Builder to create BoundedPubSubSource.
+* @param  Type of Object which will be read by the 
BoundedPubSubSource
+*/
+   @SuppressWarnings("unchecked")
+   public static class BoundedPubSubSourceBuilder, BUILDER extends BoundedPubSubSourceBuilder> extends PubSubSourceBuilder {
 
 Review comment:
   Too long


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-21 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r211664341
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Timer;
+import java.util.TimerTask;
+
+class Bound implements Serializable {
 
 Review comment:
   We'd better add a doc for the class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-21 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r211664304
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Timer;
+import java.util.TimerTask;
+
+class Bound implements Serializable {
+   private static final Logger LOG = LoggerFactory.getLogger(Bound.class);
+
+   private final Bound.Mode mode;
+   private final long maxMessagedReceived;
+   private final long maxTimeBetweenMessages;
+
+   private SourceFunction sourceFunction;
+   private transient Timer timer;
+   private long messagesReceived;
+   private long lastReceivedMessage;
+   private boolean cancelled = false;
+
+   private Bound(Bound.Mode mode, long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   this.mode = mode;
+   this.maxMessagedReceived = maxMessagedReceived;
+   this.maxTimeBetweenMessages = maxTimeBetweenMessages;
+   this.messagesReceived = 0L;
+   }
+
+   static  Bound boundByAmountOfMessages(long 
maxMessagedReceived) {
+   return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L);
+   }
+
+   static  Bound boundByTimeSinceLastMessage(long 
maxTimeBetweenMessages) {
+   return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages);
+   }
+
+   static  Bound 
boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, 
maxTimeBetweenMessages);
+   }
+
+   private TimerTask shutdownPubSubSource() {
+   return new TimerTask() {
+   @Override
+   public void run() {
+   if (maxTimeBetweenMessagesElapsed()) {
+   
cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source");
+   timer.cancel();
+   }
+   }
+   };
+   }
+
+   private synchronized boolean maxTimeBetweenMessagesElapsed() {
+   return System.currentTimeMillis() - lastReceivedMessage > 
maxTimeBetweenMessages;
+   }
+
+   private synchronized void cancelPubSubSource(String logMessage) {
+   if (!cancelled) {
+   cancelled = true;
+   sourceFunction.cancel();
+   LOG.info(logMessage);
+   }
+   }
+
+   void start(SourceFunction sourceFunction) {
+   if (this.sourceFunction != null) {
+   throw new IllegalStateException("start() already 
called");
+   }
+
+   this.sourceFunction = sourceFunction;
+   messagesReceived = 0;
+
+   if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) {
+   lastReceivedMessage = System.currentTimeMillis();
+   timer = new Timer();
+   timer.schedule(shutdownPubSubSource(), 0, 100);
+   }
+   }
+
+   synchronized void receivedMessage() {
+   if (sourceFunction == null) {
+   throw new IllegalStateException("start() not called");
+   }
+
+   lastReceivedMessage = System.currentTimeMillis();
+   messagesReceived++;
 
 Review comment:
   Maybe we can report some information as metrics?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specif

[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-21 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r211664234
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.io.IOException;
+
+/**
+ * A bounded PubSub Source, similar to {@link PubSubSource} but this will stop 
at some point. For example after a period of idle or and after n amount of 
messages have been received.
 
 Review comment:
   Too long, I suggest break the line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services