[GitHub] flink pull request #5845: [FLINK-9168][flink-connectors]Pulsar Sink connecto...

2018-05-09 Thread pluppens
Github user pluppens commented on a diff in the pull request:

https://github.com/apache/flink/pull/5845#discussion_r187085104
  
--- Diff: 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducerTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import 
org.apache.flink.streaming.connectors.pulsar.serde.IntegerSerializationSchema;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test of {@link FlinkPulsarProducer}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(PulsarClient.class)
+public class FlinkPulsarProducerTest {
+
+   private static final String MOCK_SERVICE_URIL = "http://localhost:8080;;
--- End diff --

`MOCK_SERVICE_URL` or `URI`?


---


[GitHub] flink pull request #5845: [FLINK-9168][flink-connectors]Pulsar Sink connecto...

2018-05-09 Thread pluppens
Github user pluppens commented on a diff in the pull request:

https://github.com/apache/flink/pull/5845#discussion_r187084563
  
--- Diff: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+/**
+ * The supported producing modes of operation for flink's pulsar producer.
+ */
+public enum PulsarProduceMode {
+
+   /**
+* Any produce failures will be ignored hence there could be data loss.
+*/
+   AT_MOST_ONCE,
+
+   /**
+* The producer will ensure that all the events are persisted in pulsar.
+* There could be duplicate events written though.
+*/
+   AT_LEAST_ONE,
--- End diff --

Is this is intentional? `AT_LEAST_ONCE` seems more appropriate?


---


[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-02-01 Thread pluppens
Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Good point. An ugly workaround would be to store a timestamp when the 
ending number is being set on a shard, and provide a configurable/sufficiently 
enough (eg. 7 days) window. It would exclude the dependency on the Kinesis API.


---


[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-02-01 Thread pluppens
Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Regarding the remark from @StephanEwen: perhaps it would be ok to re-use 
the `KinesisProxy` to return a list of all shards and compare them to the 
`sequenceNumsToRestore` to prune any shards that no longer exist? It would 
delay the restoration, but you'd be sure the state wouldn't grow indefinitely 
(we were looking at around a 1000 closed shards with a 24 hour retention 
period, so 365k per year - that's not going to end well). Another option would 
be to kick off another task periodically to prune them, but that is likely to 
run into race conditions, so doing it at the safe point of restoration would 
make more sense to me.


---


[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-31 Thread pluppens
Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Thanks - we've been running it in production for the last 5 days without 
issues, so it seems to work fine. We'll be enabling autoscaling of the streams 
in the coming hours, so if anything is amiss, it should pop up on our radar in 
the coming days.


---


[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-28 Thread pluppens
Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
@tzulitai Is there anything more I can do from my side?


---


[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-25 Thread pluppens
Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
@bowenli86 Makes sense - I've updated the description to contain the 
initial email/issue. HTH.


---


[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-24 Thread pluppens
Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
@bowenli86 we're passing the last-seen shardId, and the Kinesis call 
returns only newer shards. Not sure if that answers your remark - because I 
didn't really understand the question either.


---


[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-23 Thread pluppens
Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Alright, I've given it a quick stab - but the whole 'remove/update/re-add' 
cycle is kinda ugly due to the hashcode change. And I've just copied the test 
from the other example rather than using the harness, and the tests are pretty 
messy.


---


[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-23 Thread pluppens
Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Just a small remark - from what I understood, the only property that *can* 
change is the endingSequenceNumber - all other state should be considered as 
'set once', so there should be no point in comparing other properties and 
synchronizing them - or did I miss something?


---


[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-23 Thread pluppens
Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Ok, that makes sense to me. Give me a bit to cook up both the new test and 
the new approach, and I'll update the PR. Thank you very much for the comments!


---


[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-23 Thread pluppens
Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Ok, so you'd prefer to synchronize the state of the retrieve shard against 
the stored shards by comparing its stream name and shard id, before doing the 
containsKey() check?


---


[GitHub] flink pull request #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kin...

2018-01-23 Thread pluppens
Github user pluppens commented on a diff in the pull request:

https://github.com/apache/flink/pull/5337#discussion_r163226460
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 ---
@@ -515,6 +515,56 @@ public void 
testStreamShardMetadataSerializedUsingPojoSerializer() {
assertTrue(typeInformation.createSerializer(new 
ExecutionConfig()) instanceof PojoSerializer);
}
 
+   /**
+* FLINK-8484: ensure that a state change in the StreamShardMetadata 
other than {@link StreamShardMetadata#shardId} or
+* {@link StreamShardMetadata#streamName} does not result in the shard 
not being able to be restored.
+* This handles the corner case where the stored shard metadata is open 
(no ending sequence number), but after the
+* job restore, the shard has been closed (ending number set) due to 
re-sharding, and we can no longer rely on
+* {@link StreamShardMetadata#equals(Object)} to find back the sequence 
number in the collection of restored shard metadata.
+*/
+   @Test
+   public void testFindSequenceNumberToRestoreFrom() {
--- End diff --

Makes sense. I'll look into it and see if I can find a way to test it as a 
whole.


---


[GitHub] flink pull request #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kin...

2018-01-23 Thread pluppens
GitHub user pluppens opened a pull request:

https://github.com/apache/flink/pull/5337

[FLINK-8484][flink-kinesis-connector] Ensure a Kinesis consumer snapshot 
restoration is able to handle recently closed shards

FLINK-8484: ensure that a state change in the StreamShardMetadata other 
than `StreamShardMetadata.shardId` or `StreamShardMetadata.streamName` does not 
result in the shard not being able to be restored. This handles the corner case 
where a shard might have been closed (ending sequence number set to not-null) 
since the last savepoint or checkpoint when a job is restarted from a snapshot 
state.

## Brief change log
 - Created a new method to perform the sequence number lookup
 - Ensure that a lookup for a given existing Kinesis shard does not rely on 
equals(), but rather checks for equality on the stream name and shard id only


## Verifying this change

This change added tests and can be verified as follows:
 - A new unit test was added in `FlinkKinesisConsumerTest` called 
`testFindSequenceNumberToRestoreFrom()` which tests the lookup mechanism

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pluppens/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5337.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5337


commit 5c756390002a2e1c00c7368bea3e1135b7722a20
Author: Philip Luppens <philip.luppens@...>
Date:   2018-01-23T08:00:23Z

FLINK-8484: ensure that a state change in the StreamShardMetadata other 
than `StreamShardMetadata.shardId` or `StreamShardMetadata.streamName` does not 
result in the shard not being able to be restored. This handles the corner case 
where a shard might have been closed (ending sequence number set to not-null) 
since the last savepoint or checkpoint when a job is restarted from a snapshot 
state.




---