[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...
Github user cckellogg commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r203863637 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.util.IOUtils; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * Pulsar source (consumer) which receives messages from a topic and acknowledges messages. + * When checkpointing is enabled, it guarantees at least once processing semantics. + * + * When checkpointing is disabled, it auto acknowledges messages based on the number of messages it has + * received. In this mode messages may be dropped. + */ +class PulsarConsumerSource extends MessageAcknowledgingSourceBase implements PulsarSourceBase { + + private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerSource.class); + + private final int messageReceiveTimeoutMs = 100; + private final String serviceUrl; + private final String topic; + private final String subscriptionName; + private final DeserializationSchema deserializer; + + private PulsarClient client; + private Consumer consumer; + + private boolean isCheckpointingEnabled; + + private final long acknowledgementBatchSize; + private long batchCount; + private long totalMessageCount; + + private transient volatile boolean isRunning; + + PulsarConsumerSource(PulsarSourceBuilder builder) { + super(MessageId.class); + this.serviceUrl = builder.serviceUrl; + this.topic = builder.topic; + this.deserializer = builder.deserializationSchema; + this.subscriptionName = builder.subscriptionName; + this.acknowledgementBatchSize = builder.acknowledgementBatchSize; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + final RuntimeContext context = getRuntimeContext(); + if (context instanceof StreamingRuntimeContext) { + isCheckpointingEnabled = ((StreamingRuntimeContext) context).isCheckpointingEnabled(); + } + + client = createClient(); + consumer = createConsumer(client); + + isRunning = true; + } + + @Override + protected void acknowledgeIDs(long checkpointId, Set messageIds) { + if (consumer == null) { + LOG.error("null consumer unable to acknowledge messages"); + throw new RuntimeException("null pulsar consumer unable to acknowledge messages"); + } + + if (messageIds.isEmpty()) { + LOG.info("no message ids to acknowledge"); + return;
[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...
Github user cckellogg commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r199909793 --- Diff: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java --- @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerStats; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageImpl; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Matchers.any; + +/** + * Tests for the PulsarConsumerSource. The source supports two operation modes. + * 1) At-least-once (when checkpointed) with Pulsar message acknowledgements and the deduplication mechanism in + *{@link org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase}.. + * 3) No strong delivery guarantees (without checkpointing) with Pulsar acknowledging messages after + * after it receives x number of messages. + * + * This tests assumes that the MessageIds are increasing monotonously. That doesn't have to be the + * case. The MessageId is used to uniquely identify messages. + */ +public class PulsarConsumerSourceTests { + + private PulsarConsumerSource source; + + private TestConsumer consumer; + + private TestSourceContext context; + + private Thread sourceThread; + + private Exception exception; --- End diff -- will fix ---
[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...
Github user cckellogg commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r199909775 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; + +/** + * Base class for pulsar sources. + * @param --- End diff -- will add a comment ---
[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...
Github user cckellogg commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r199909645 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/Defaults.java --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +/** + * Default values for Pulsar connectors. + */ +public class Defaults { --- End diff -- will remove. ---
[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...
Github user cckellogg commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r199909617 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.util.IOUtils; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * Pulsar source (consumer) which receives messages from a topic and acknowledges messages. + * When checkpointing is enabled, it guarantees at least once processing semantics. + * + * When checkpointing is disabled, it auto acknowledges messages based on the number of messages it has + * received. In this mode messages may be dropped. + */ +class PulsarConsumerSource extends MessageAcknowledgingSourceBase implements PulsarSourceBase { + + private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerSource.class); + + private final int messageReceiveTimeoutMs = 100; + private final String serviceUrl; + private final String topic; + private final String subscriptionName; + private final DeserializationSchema deserializer; + + private PulsarClient client; + private Consumer consumer; + + private boolean isCheckpointingEnabled; + + private final long acknowledgementBatchSize; + private long batchCount; + private long totalMessageCount; + + private transient volatile boolean isRunning; + + PulsarConsumerSource(PulsarSourceBuilder builder) { + super(MessageId.class); + this.serviceUrl = builder.serviceUrl; + this.topic = builder.topic; + this.deserializer = builder.deserializationSchema; + this.subscriptionName = builder.subscriptionName; + this.acknowledgementBatchSize = builder.acknowledgementBatchSize; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + final RuntimeContext context = getRuntimeContext(); + if (context instanceof StreamingRuntimeContext) { + isCheckpointingEnabled = ((StreamingRuntimeContext) context).isCheckpointingEnabled(); + } + + client = createClient(); + consumer = createConsumer(client); + + isRunning = true; + } + + @Override + protected void acknowledgeIDs(long checkpointId, Set messageIds) { + if (consumer == null) { + LOG.error("null consumer unable to acknowledge messages"); + throw new RuntimeException("null pulsar consumer unable to acknowledge messages"); + } + + if (messageIds.isEmpty()) { + LOG.info("no message ids to acknowledge"); + return;
[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r199548194 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.util.IOUtils; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * Pulsar source (consumer) which receives messages from a topic and acknowledges messages. + * When checkpointing is enabled, it guarantees at least once processing semantics. + * + * When checkpointing is disabled, it auto acknowledges messages based on the number of messages it has + * received. In this mode messages may be dropped. + */ +class PulsarConsumerSource extends MessageAcknowledgingSourceBase implements PulsarSourceBase { + + private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerSource.class); + + private final int messageReceiveTimeoutMs = 100; + private final String serviceUrl; + private final String topic; + private final String subscriptionName; + private final DeserializationSchema deserializer; + + private PulsarClient client; + private Consumer consumer; + + private boolean isCheckpointingEnabled; + + private final long acknowledgementBatchSize; + private long batchCount; + private long totalMessageCount; + + private transient volatile boolean isRunning; + + PulsarConsumerSource(PulsarSourceBuilder builder) { + super(MessageId.class); + this.serviceUrl = builder.serviceUrl; + this.topic = builder.topic; + this.deserializer = builder.deserializationSchema; + this.subscriptionName = builder.subscriptionName; + this.acknowledgementBatchSize = builder.acknowledgementBatchSize; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + final RuntimeContext context = getRuntimeContext(); + if (context instanceof StreamingRuntimeContext) { + isCheckpointingEnabled = ((StreamingRuntimeContext) context).isCheckpointingEnabled(); + } + + client = createClient(); + consumer = createConsumer(client); + + isRunning = true; + } + + @Override + protected void acknowledgeIDs(long checkpointId, Set messageIds) { + if (consumer == null) { + LOG.error("null consumer unable to acknowledge messages"); + throw new RuntimeException("null pulsar consumer unable to acknowledge messages"); + } + + if (messageIds.isEmpty()) { + LOG.info("no message ids to acknowledge"); + return;
[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r199546643 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/Defaults.java --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +/** + * Default values for Pulsar connectors. + */ +public class Defaults { --- End diff -- since this class just is used by `PulsarSourceBuilder`, I think we can move the constants into `PulsarSourceBuilder ` and remove this class to reduce the cost of maintenance. ---
[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r199549195 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; + +/** + * Base class for pulsar sources. + * @param --- End diff -- describe the generic type. ---
[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r199551693 --- Diff: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java --- @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerStats; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageImpl; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Matchers.any; + +/** + * Tests for the PulsarConsumerSource. The source supports two operation modes. + * 1) At-least-once (when checkpointed) with Pulsar message acknowledgements and the deduplication mechanism in + *{@link org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase}.. + * 3) No strong delivery guarantees (without checkpointing) with Pulsar acknowledging messages after + * after it receives x number of messages. + * + * This tests assumes that the MessageIds are increasing monotonously. That doesn't have to be the + * case. The MessageId is used to uniquely identify messages. + */ +public class PulsarConsumerSourceTests { + + private PulsarConsumerSource source; + + private TestConsumer consumer; + + private TestSourceContext context; + + private Thread sourceThread; + + private Exception exception; --- End diff -- it is not necessary so many blank lines. ---
[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...
GitHub user cckellogg opened a pull request: https://github.com/apache/flink/pull/6200 [FLINK-9641] [streaming-connectors] Flink pulsar source connector ## What is the purpose of the change This pull request adds a [pulsar](https://github.com/apache/incubator-pulsar) source connector which will enable flink jobs to process messages from pulsar topics. ## Brief change log - Add a PulsarConsumerSource connector ## Verifying this change This change adds unit test to verify checkpointing and batch message acknowledgements. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/cckellogg/flink flink-pulsar-source-connector Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6200.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6200 commit b69fb21dc82e7922f7b7e65c94c154d56e442e5e Author: Chris Date: 2018-06-20T21:53:06Z Add a simple pulsar source connector. commit fb170c435abb2b2e09913a0430d2f73dc1edbbe1 Author: Chris Date: 2018-06-21T00:03:12Z Remove metrics class and add max ack batch size. ---