[GitHub] flink pull request #5845: [FLINK-9168][flink-connectors]Pulsar Sink connecto...
Github user pluppens commented on a diff in the pull request: https://github.com/apache/flink/pull/5845#discussion_r187085104 --- Diff: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducerTest.java --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor; +import org.apache.flink.streaming.connectors.pulsar.serde.IntegerSerializationSchema; + +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +/** + * Unit test of {@link FlinkPulsarProducer}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(PulsarClient.class) +public class FlinkPulsarProducerTest { + + private static final String MOCK_SERVICE_URIL = "http://localhost:8080;; --- End diff -- `MOCK_SERVICE_URL` or `URI`? ---
[GitHub] flink pull request #5845: [FLINK-9168][flink-connectors]Pulsar Sink connecto...
Github user pluppens commented on a diff in the pull request: https://github.com/apache/flink/pull/5845#discussion_r187084563 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +/** + * The supported producing modes of operation for flink's pulsar producer. + */ +public enum PulsarProduceMode { + + /** +* Any produce failures will be ignored hence there could be data loss. +*/ + AT_MOST_ONCE, + + /** +* The producer will ensure that all the events are persisted in pulsar. +* There could be duplicate events written though. +*/ + AT_LEAST_ONE, --- End diff -- Is this is intentional? `AT_LEAST_ONCE` seems more appropriate? ---
[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Good point. An ugly workaround would be to store a timestamp when the ending number is being set on a shard, and provide a configurable/sufficiently enough (eg. 7 days) window. It would exclude the dependency on the Kinesis API. ---
[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Regarding the remark from @StephanEwen: perhaps it would be ok to re-use the `KinesisProxy` to return a list of all shards and compare them to the `sequenceNumsToRestore` to prune any shards that no longer exist? It would delay the restoration, but you'd be sure the state wouldn't grow indefinitely (we were looking at around a 1000 closed shards with a 24 hour retention period, so 365k per year - that's not going to end well). Another option would be to kick off another task periodically to prune them, but that is likely to run into race conditions, so doing it at the safe point of restoration would make more sense to me. ---
[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Thanks - we've been running it in production for the last 5 days without issues, so it seems to work fine. We'll be enabling autoscaling of the streams in the coming hours, so if anything is amiss, it should pop up on our radar in the coming days. ---
[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 @tzulitai Is there anything more I can do from my side? ---
[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 @bowenli86 Makes sense - I've updated the description to contain the initial email/issue. HTH. ---
[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 @bowenli86 we're passing the last-seen shardId, and the Kinesis call returns only newer shards. Not sure if that answers your remark - because I didn't really understand the question either. ---
[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Alright, I've given it a quick stab - but the whole 'remove/update/re-add' cycle is kinda ugly due to the hashcode change. And I've just copied the test from the other example rather than using the harness, and the tests are pretty messy. ---
[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Just a small remark - from what I understood, the only property that *can* change is the endingSequenceNumber - all other state should be considered as 'set once', so there should be no point in comparing other properties and synchronizing them - or did I miss something? ---
[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Ok, that makes sense to me. Give me a bit to cook up both the new test and the new approach, and I'll update the PR. Thank you very much for the comments! ---
[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Ok, so you'd prefer to synchronize the state of the retrieve shard against the stored shards by comparing its stream name and shard id, before doing the containsKey() check? ---
[GitHub] flink pull request #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kin...
Github user pluppens commented on a diff in the pull request: https://github.com/apache/flink/pull/5337#discussion_r163226460 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java --- @@ -515,6 +515,56 @@ public void testStreamShardMetadataSerializedUsingPojoSerializer() { assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer); } + /** +* FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#shardId} or +* {@link StreamShardMetadata#streamName} does not result in the shard not being able to be restored. +* This handles the corner case where the stored shard metadata is open (no ending sequence number), but after the +* job restore, the shard has been closed (ending number set) due to re-sharding, and we can no longer rely on +* {@link StreamShardMetadata#equals(Object)} to find back the sequence number in the collection of restored shard metadata. +*/ + @Test + public void testFindSequenceNumberToRestoreFrom() { --- End diff -- Makes sense. I'll look into it and see if I can find a way to test it as a whole. ---
[GitHub] flink pull request #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kin...
GitHub user pluppens opened a pull request: https://github.com/apache/flink/pull/5337 [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis consumer snapshot restoration is able to handle recently closed shards FLINK-8484: ensure that a state change in the StreamShardMetadata other than `StreamShardMetadata.shardId` or `StreamShardMetadata.streamName` does not result in the shard not being able to be restored. This handles the corner case where a shard might have been closed (ending sequence number set to not-null) since the last savepoint or checkpoint when a job is restarted from a snapshot state. ## Brief change log - Created a new method to perform the sequence number lookup - Ensure that a lookup for a given existing Kinesis shard does not rely on equals(), but rather checks for equality on the stream name and shard id only ## Verifying this change This change added tests and can be verified as follows: - A new unit test was added in `FlinkKinesisConsumerTest` called `testFindSequenceNumberToRestoreFrom()` which tests the lookup mechanism ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/pluppens/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5337.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5337 commit 5c756390002a2e1c00c7368bea3e1135b7722a20 Author: Philip Luppens <philip.luppens@...> Date: 2018-01-23T08:00:23Z FLINK-8484: ensure that a state change in the StreamShardMetadata other than `StreamShardMetadata.shardId` or `StreamShardMetadata.streamName` does not result in the shard not being able to be restored. This handles the corner case where a shard might have been closed (ending sequence number set to not-null) since the last savepoint or checkpoint when a job is restarted from a snapshot state. ---