[GitHub] jerrypeng commented on issue #1771: setting auto ack to be true for sources and sinks

2018-05-11 Thread GitBox
jerrypeng commented on issue #1771: setting auto ack to be true for sources and 
sinks
URL: https://github.com/apache/incubator-pulsar/pull/1771#issuecomment-388528354
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1770: remove unnecessary log message in sink cmd

2018-05-11 Thread GitBox
jerrypeng commented on issue #1770: remove unnecessary log message in sink cmd
URL: https://github.com/apache/incubator-pulsar/pull/1770#issuecomment-388528334
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1769: refactor source and sink classname to be pulsar source and sink when not set

2018-05-11 Thread GitBox
jerrypeng commented on issue #1769: refactor source and sink classname to be 
pulsar source and sink when not set
URL: https://github.com/apache/incubator-pulsar/pull/1769#issuecomment-388528292
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng opened a new pull request #1771: setting auto ack to be true for sources and sinks

2018-05-11 Thread GitBox
jerrypeng opened a new pull request #1771: setting auto ack to be true for 
sources and sinks
URL: https://github.com/apache/incubator-pulsar/pull/1771
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
merlimat commented on issue #1764: WIP - Pulsar Go client library
URL: https://github.com/apache/incubator-pulsar/pull/1764#issuecomment-388510252
 
 
   @bruth Addressed most of the comments 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng opened a new pull request #1770: remove unnecessary log message in sink cmd

2018-05-11 Thread GitBox
jerrypeng opened a new pull request #1770: remove unnecessary log message in 
sink cmd
URL: https://github.com/apache/incubator-pulsar/pull/1770
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
merlimat commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187753302
 
 

 ##
 File path: pulsar-client-go/examples/consumer-listener/consumer-listener.go
 ##
 @@ -0,0 +1,55 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
 
 Review comment:
   Ok, the chicken/egg problem now is that this is not available in final repo. 
I'll update the examples once the PR is checked in


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
merlimat commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187753201
 
 

 ##
 File path: pulsar-client-go/examples/reader/reader.go
 ##
 @@ -0,0 +1,48 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   reader, err := 
client.NewReader().Topic("my-topic").StartFromEarliest().Create()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   for {
+   msg, err := reader.ReadNext()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   fmt.Printf("Received message  msgId: %s -- content: '%s'\n",
+   msg.MessageId(), string(msg.Payload()))
+   }
 
 Review comment:
   Yes, on websocket it's a bit different because there is no explicit flow 
control when using a reader. The "ack" is the only way to control delivery to a 
reader through websocket.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
merlimat commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187752900
 
 

 ##
 File path: pulsar-client-go/examples/consumer/consumer.go
 ##
 @@ -0,0 +1,54 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   consumer, err := client.NewConsumer().
+   Topic("my-topic").
+   SubscriptionName("my-subscription").
+   SubscriptionType(pulsar.Shared).
+   Subscribe()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   for {
+   msg, err := consumer.Receive()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   fmt.Printf("Received message  msgId: %s -- content: '%s'\n",
+   msg.MessageId(), string(msg.Payload()))
 
 Review comment:
   Sure, abbreviated looks better, though I prefer `Id()` since it's not 
technically an acronym but an abbreviation. `Ack()` , `Id()`... 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
merlimat commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187752774
 
 

 ##
 File path: pulsar-client-go/examples/consumer/consumer.go
 ##
 @@ -0,0 +1,54 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   consumer, err := client.NewConsumer().
+   Topic("my-topic").
+   SubscriptionName("my-subscription").
+   SubscriptionType(pulsar.Shared).
+   Subscribe()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   for {
+   msg, err := consumer.Receive()
 
 Review comment:
   Actually, I've come to conclusion to remove `Receive()` and `Next()` with 
timeouts. If one wants to have that, the channel listener can be used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1769: refactor source and sink classname to be pulsar source and sink when not set

2018-05-11 Thread GitBox
jerrypeng commented on issue #1769: refactor source and sink classname to be 
pulsar source and sink when not set
URL: https://github.com/apache/incubator-pulsar/pull/1769#issuecomment-388508529
 
 
   @srkukarni @sijie please review


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng opened a new pull request #1769: refactor source and sink classname to be pulsar source and sink when not set

2018-05-11 Thread GitBox
jerrypeng opened a new pull request #1769: refactor source and sink classname 
to be pulsar source and sink when not set
URL: https://github.com/apache/incubator-pulsar/pull/1769
 
 
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
merlimat commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187752595
 
 

 ##
 File path: pulsar-client-go/examples/consumer-listener/consumer-listener.go
 ##
 @@ -0,0 +1,55 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   channel := make(chan pulsar.ConsumerMessage)
+
+   consumer, err := client.NewConsumer().
+   Topic("my-topic").
+   SubscriptionName("my-subscription").
+   SubscriptionType(pulsar.Shared).
+   MessageListener(channel).
+   Subscribe()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   // Receive messages from channel
+   for cm := range channel {
+   consumer := cm.Consumer
+   msg := cm.Message
+   fmt.Printf("Received message  msgId: %s -- content: '%s'\n",
+   msg.MessageId(), string(msg.Payload()))
+
+   consumer.Acknowledge(msg)
 
 Review comment:
   `Ack()` sounds good.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on a change in pull request #1758: Windowing for Pulsar Functions

2018-05-11 Thread GitBox
jerrypeng commented on a change in pull request #1758: Windowing for Pulsar 
Functions
URL: https://github.com/apache/incubator-pulsar/pull/1758#discussion_r187732990
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 ##
 @@ -21,16 +21,22 @@
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.io.core.Source;
 
 import org.apache.pulsar.functions.source.PulsarSource;
+import org.apache.pulsar.functions.windowing.Window;
 
 Review comment:
   correct I will remove


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on a change in pull request #1758: Windowing for Pulsar Functions

2018-05-11 Thread GitBox
srkukarni commented on a change in pull request #1758: Windowing for Pulsar 
Functions
URL: https://github.com/apache/incubator-pulsar/pull/1758#discussion_r187732720
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 ##
 @@ -21,16 +21,22 @@
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.io.core.Source;
 
 import org.apache.pulsar.functions.source.PulsarSource;
+import org.apache.pulsar.functions.windowing.Window;
 
 Review comment:
   i presume these imports are no longer used?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1758: Windowing for Pulsar Functions

2018-05-11 Thread GitBox
jerrypeng commented on issue #1758: Windowing for Pulsar Functions
URL: https://github.com/apache/incubator-pulsar/pull/1758#issuecomment-388469042
 
 
   @srkukarni thanks for the review. I have addressed your comments


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1766: Implementing PushSource on top of Source

2018-05-11 Thread GitBox
jerrypeng commented on issue #1766:  Implementing PushSource on top of Source
URL: https://github.com/apache/incubator-pulsar/pull/1766#issuecomment-388435528
 
 
   I discussed with @srkukarni and we are going to leave the KakfaSource as it 
is for now


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1758: Windowing for Pulsar Functions

2018-05-11 Thread GitBox
jerrypeng commented on issue #1758: Windowing for Pulsar Functions
URL: https://github.com/apache/incubator-pulsar/pull/1758#issuecomment-388434193
 
 
   @srkukarni i think that is a good approach. Let me refactor the code


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on issue #1768: InputStream to read from S3

2018-05-11 Thread GitBox
ivankelly commented on issue #1768: InputStream to read from S3
URL: https://github.com/apache/incubator-pulsar/pull/1768#issuecomment-388395548
 
 
   retest this please //  V1_ReplicatorTest.testCloseReplicatorStartProducer


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1746: PIP-17: impl offload() for S3ManagedLedgerOffloader

2018-05-11 Thread GitBox
ivankelly commented on a change in pull request #1746: PIP-17: impl offload() 
for S3ManagedLedgerOffloader
URL: https://github.com/apache/incubator-pulsar/pull/1746#discussion_r187596277
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
 ##
 @@ -60,28 +77,96 @@ public static S3ManagedLedgerOffloader 
create(ServiceConfiguration conf,
 } else {
 builder.setRegion(region);
 }
-return new S3ManagedLedgerOffloader(builder.build(), bucket, 
scheduler);
+return new S3ManagedLedgerOffloader(builder.build(), bucket, 
scheduler, maxBlockSize);
 }
 
-S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, 
ScheduledExecutorService scheduler) {
+S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, 
ScheduledExecutorService scheduler, int maxBlockSize) {
 this.s3client = s3client;
 this.bucket = bucket;
 this.scheduler = scheduler;
+this.maxBlockSize = maxBlockSize;
 }
 
+static String dataBlockOffloadKey(ReadHandle readHandle, UUID uuid) {
+return String.format("ledger-%d-%s", readHandle.getId(), 
uuid.toString());
+}
+
+static String indexBlockOffloadKey(ReadHandle readHandle, UUID uuid) {
+return String.format("ledger-%d-%s-index", readHandle.getId(), 
uuid.toString());
+}
+
+// upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new 
Block,
+// because the smallest size for MultiPartUpload is 5MB, which is too big 
for it at present.
 
 Review comment:
   I would remove this comment. It's not rely relevant anymore.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1746: PIP-17: impl offload() for S3ManagedLedgerOffloader

2018-05-11 Thread GitBox
ivankelly commented on a change in pull request #1746: PIP-17: impl offload() 
for S3ManagedLedgerOffloader
URL: https://github.com/apache/incubator-pulsar/pull/1746#discussion_r187597507
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
 ##
 @@ -111,5 +186,87 @@ public void testNoBucketConfigured() throws Exception {
 // correct
 }
 }
+
+@Test
+public void testOffload() throws Exception {
+int entryLength = 10;
+int entryNumberEachBlock = 10;
+ServiceConfiguration conf = new ServiceConfiguration();
+
conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+
+conf.setS3ManagedLedgerOffloadBucket(BUCKET);
+conf.setS3ManagedLedgerOffloadRegion("eu-west-1");
+conf.setS3ManagedLedgerOffloadServiceEndpoint(s3endpoint);
+conf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(
+DataBlockHeaderImpl.getDataStartOffset() + (entryLength + 12) * 
entryNumberEachBlock);
+LedgerOffloader offloader = S3ManagedLedgerOffloader.create(conf, 
scheduler);
+
+// offload 30 entries, which will be placed into 3 data blocks.
+int entryCount = 30;
+ReadHandle readHandle = buildReadHandle(entryCount);
+UUID uuid = UUID.randomUUID();
+offloader.offload(readHandle, uuid, new HashMap<>()).get();
+
+S3Object obj = s3client.getObject(BUCKET, 
S3ManagedLedgerOffloader.dataBlockOffloadKey(readHandle, uuid));
+S3Object indexObj = s3client.getObject(BUCKET, 
S3ManagedLedgerOffloader.indexBlockOffloadKey(readHandle, uuid));
+
+verifyS3ObjectRead(obj, indexObj, readHandle, 3, 30, 
conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
+}
+
+@Test
+public void testOffloadFail() throws Exception {
 
 Review comment:
   Break this isn't multiple tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1746: PIP-17: impl offload() for S3ManagedLedgerOffloader

2018-05-11 Thread GitBox
ivankelly commented on a change in pull request #1746: PIP-17: impl offload() 
for S3ManagedLedgerOffloader
URL: https://github.com/apache/incubator-pulsar/pull/1746#discussion_r187597119
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
 ##
 @@ -18,30 +18,40 @@
  */
 package org.apache.pulsar.broker.s3offload;
 
+import static org.mockito.Matchers.any;
+import static org.testng.Assert.assertEquals;
 
 Review comment:
   why import static when Assert is already pulled in? I've seen other people 
seem to prefer the static as well. I'm just wondering what the reason behind it 
is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1746: PIP-17: impl offload() for S3ManagedLedgerOffloader

2018-05-11 Thread GitBox
ivankelly commented on a change in pull request #1746: PIP-17: impl offload() 
for S3ManagedLedgerOffloader
URL: https://github.com/apache/incubator-pulsar/pull/1746#discussion_r187597447
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
 ##
 @@ -111,5 +186,87 @@ public void testNoBucketConfigured() throws Exception {
 // correct
 }
 }
+
+@Test
+public void testOffload() throws Exception {
+int entryLength = 10;
+int entryNumberEachBlock = 10;
+ServiceConfiguration conf = new ServiceConfiguration();
+
conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+
+conf.setS3ManagedLedgerOffloadBucket(BUCKET);
+conf.setS3ManagedLedgerOffloadRegion("eu-west-1");
+conf.setS3ManagedLedgerOffloadServiceEndpoint(s3endpoint);
+conf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(
+DataBlockHeaderImpl.getDataStartOffset() + (entryLength + 12) * 
entryNumberEachBlock);
+LedgerOffloader offloader = S3ManagedLedgerOffloader.create(conf, 
scheduler);
+
+// offload 30 entries, which will be placed into 3 data blocks.
+int entryCount = 30;
+ReadHandle readHandle = buildReadHandle(entryCount);
+UUID uuid = UUID.randomUUID();
+offloader.offload(readHandle, uuid, new HashMap<>()).get();
+
+S3Object obj = s3client.getObject(BUCKET, 
S3ManagedLedgerOffloader.dataBlockOffloadKey(readHandle, uuid));
+S3Object indexObj = s3client.getObject(BUCKET, 
S3ManagedLedgerOffloader.indexBlockOffloadKey(readHandle, uuid));
+
+verifyS3ObjectRead(obj, indexObj, readHandle, 3, 30, 
conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
+}
+
+@Test
+public void testOffloadFail() throws Exception {
+int entryLength = 10;
+int entryNumberEachBlock = 10;
+int maxBlockSize = DataBlockHeaderImpl.getDataStartOffset() + 
(entryLength + 12) * entryNumberEachBlock;
+
+// offload 30 entries, which will be placed into 3 data blocks.
+int entryCount = 30;
+ReadHandle readHandle = buildReadHandle(entryCount);
+UUID uuid = UUID.randomUUID();
+
+// mock throw exception when initiateMultipartUpload
+try {
+AmazonS3 mockS3client = Mockito.spy(s3client);
+
Mockito.when(mockS3client.initiateMultipartUpload(any())).thenThrow(AmazonServiceException.class);
+LedgerOffloader offloader = new 
S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, maxBlockSize);
+offloader.offload(readHandle, uuid, new HashMap<>()).get();
+fail("Should throw exception when initiateMultipartUpload");
+} catch (Exception e) {
+// excepted
 
 Review comment:
   Check that the exception thrown is the one we expect to be thrown (i.e. the 
AmazonServiceException)
   Also verify that neither object exists in S3. Do the same for the other 
tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly opened a new pull request #1768: InputStream to read from S3

2018-05-11 Thread GitBox
ivankelly opened a new pull request #1768: InputStream to read from S3
URL: https://github.com/apache/incubator-pulsar/pull/1768
 
 
   Reads from S3 in configurable chunks. Interface allows caller to seek
   to an arbitrary position.
   
   Master Issue: #1511
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on issue #1134: Docker compose files for pulsar

2018-05-11 Thread GitBox
ivankelly commented on issue #1134: Docker compose files for pulsar
URL: https://github.com/apache/incubator-pulsar/pull/1134#issuecomment-388326182
 
 
   @merlimat @aahmed-se I reworked these when checking 2.0 rc3. I think they're 
ready to merge now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bruth commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
bruth commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187572451
 
 

 ##
 File path: pulsar-client-go/examples/consumer-listener/consumer-listener.go
 ##
 @@ -0,0 +1,55 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   channel := make(chan pulsar.ConsumerMessage)
+
+   consumer, err := client.NewConsumer().
 
 Review comment:
   Hm, yes default zero values can be a nuisance. There are a couple patterns 
for this case, but they are escaping me at the moment. One more general 
alternative is to stick with the `New*` functions to initialize a client, 
consumer, producer, and reader value with the defaults preset on the struct 
which then allows the user to override the options they want. The `New` call 
could take the required options.
   
   ```go
   // Constructor with required options.
   c := NewConsumer(topic)
   // Override default
   c.Timeout = 10*time.Second
   
   // Rest..
   if err := c.Subscribe(); err != nil {
   }
   ```
   
   So in this set, trading the builder pattern for a simple struct that is 
returned that can be mutated before making the connection.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bruth commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
bruth commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187570833
 
 

 ##
 File path: pulsar-client-go/examples/reader/reader.go
 ##
 @@ -0,0 +1,48 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   reader, err := 
client.NewReader().Topic("my-topic").StartFromEarliest().Create()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   for {
+   msg, err := reader.ReadNext()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   fmt.Printf("Received message  msgId: %s -- content: '%s'\n",
+   msg.MessageId(), string(msg.Payload()))
+   }
 
 Review comment:
   Ok I thought so actually for the binary protocol.. I got confused because 
the Websocket docs requires the reader to ack (whether that is actually 
required or not on the server, I am not sure).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bruth commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
bruth commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187570569
 
 

 ##
 File path: pulsar-client-go/examples/reader/reader.go
 ##
 @@ -0,0 +1,48 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   reader, err := 
client.NewReader().Topic("my-topic").StartFromEarliest().Create()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   for {
+   msg, err := reader.ReadNext()
 
 Review comment:
   Fair enough, maybe then just `Next()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bruth commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
bruth commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187570215
 
 

 ##
 File path: pulsar-client-go/examples/reader/reader.go
 ##
 @@ -0,0 +1,48 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   reader, err := 
client.NewReader().Topic("my-topic").StartFromEarliest().Create()
 
 Review comment:
   Ah right, they are objects. Yea `const` only works with primitive types. You 
can use pre-defined variables in this case and check identity.
   
   ```go
   var (
 EarliestMessage MessageId = newMessageId()
 LatestMessage   MessageId = newMessageId()
   )
   reader := {
 StartMessage: EarliestMessage,
   }
   
   // Implementation..
   switch reader.StartMessage {
   case EarliestMessage:
 // ..
   case LatestMessage:
 // ..
   case nil:
 // ..
   default:
 // user defined..
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bruth commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
bruth commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187566247
 
 

 ##
 File path: pulsar-client-go/examples/consumer-listener/consumer-listener.go
 ##
 @@ -0,0 +1,55 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   channel := make(chan pulsar.ConsumerMessage)
+
+   consumer, err := client.NewConsumer().
+   Topic("my-topic").
+   SubscriptionName("my-subscription").
+   SubscriptionType(pulsar.Shared).
+   MessageListener(channel).
+   Subscribe()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   // Receive messages from channel
+   for cm := range channel {
+   consumer := cm.Consumer
 
 Review comment:
   Ah oh ok, the shared channel use case definitely makes why you need to do 
this here. Since this is an example, maybe just adding a comment noting this is 
the reason.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bruth commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
bruth commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187565770
 
 

 ##
 File path: pulsar-client-go/examples/consumer-listener/consumer-listener.go
 ##
 @@ -0,0 +1,55 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
 
 Review comment:
   Correct


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch asf-site updated: Updated site at revision e2b95f2

2018-05-11 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 18e9728  Updated site at revision e2b95f2
18e9728 is described below

commit 18e9728b2e04b3667ef71c47f5ad6e3de01228cb
Author: jenkins 
AuthorDate: Fri May 11 08:27:26 2018 +

Updated site at revision e2b95f2
---
 content/api/cpp/client_8h_source.html  | 2 +-
 content/api/cpp/consumer_8h_source.html| 2 +-
 content/api/cpp/consumer__configuration_8h_source.html | 2 +-
 content/api/cpp/message__id_8h_source.html | 2 +-
 content/api/cpp/message__router_8h_source.html | 2 +-
 content/api/cpp/producer_8h_source.html| 2 +-
 content/api/cpp/producer__configuration_8h_source.html | 2 +-
 content/api/cpp/reader_8h_source.html  | 2 +-
 content/api/cpp/reader__configuration_8h_source.html   | 2 +-
 content/docs/latest/reference/RestApi/index.html   | 4 ++--
 content/ja/reference/RestApi/index.html| 4 ++--
 11 files changed, 13 insertions(+), 13 deletions(-)

diff --git a/content/api/cpp/client_8h_source.html 
b/content/api/cpp/client_8h_source.html
index b174f5e..31f1287 100644
--- a/content/api/cpp/client_8h_source.html
+++ b/content/api/cpp/client_8h_source.html
@@ -89,7 +89,7 @@ var searchBox = new SearchBox("searchBox", 
"search",false,'Search');
 client.h  
 
 
-1   20#pragma once   21   22#include 
pulsar/c/client_configuration.h1   20#pragma once   21   22#include 
pulsar/c/client_configuration.h
 
 Generated by http://www.doxygen.org/index.html;>
diff --git a/content/api/cpp/consumer_8h_source.html 
b/content/api/cpp/consumer_8h_source.html
index 47ffaa8..6b9cfb3 100644
--- a/content/api/cpp/consumer_8h_source.html
+++ b/content/api/cpp/consumer_8h_source.html
@@ -89,7 +89,7 @@ var searchBox = new SearchBox("searchBox", 
"search",false,'Search');
 consumer.h  
 
 
-1   19#pragma once   20   21#ifdef __cplusplus< [...]
+1   19#pragma once   20   21#ifdef __cplusplus< [...]
 
 
 Generated by http://www.doxygen.org/index.html;>
diff --git a/content/api/cpp/consumer__configuration_8h_source.html 
b/content/api/cpp/consumer__configuration_8h_source.html
index 1d5cf90..c64c8b9 100644
--- a/content/api/cpp/consumer__configuration_8h_source.html
+++ b/content/api/cpp/consumer__configuration_8h_source.html
@@ -89,7 +89,7 @@ var searchBox = new SearchBox("searchBox", 
"search",false,'Search');
 consumer_configuration.h  
 
 
-1   19#pragma once   20   21#ifdef __cplusplus< [...]
+1   19#pragma once   20   21#ifdef __cplusplus< [...]
 
 
 Generated by http://www.doxygen.org/index.html;>
diff --git a/content/api/cpp/message__id_8h_source.html 
b/content/api/cpp/message__id_8h_source.html
index 7bef8eb..914de55 100644
--- a/content/api/cpp/message__id_8h_source.html
+++ b/content/api/cpp/message__id_8h_source.html
@@ -89,7 +89,7 @@ var searchBox = new SearchBox("searchBox", 
"search",false,'Search');
 message_id.h  
 
 
-1   20#pragma once   21   22#ifdef __cplusplus< [...]
+1   20#pragma once   21   22#ifdef __cplusplus< [...]
 
 
 Generated by http://www.doxygen.org/index.html;>
diff --git a/content/api/cpp/message__router_8h_source.html 
b/content/api/cpp/message__router_8h_source.html
index ea281a0..f41f42e 100644
--- a/content/api/cpp/message__router_8h_source.html
+++ b/content/api/cpp/message__router_8h_source.html
@@ -89,7 +89,7 @@ var searchBox = new SearchBox("searchBox", 
"search",false,'Search');
 message_router.h  
 
 
-1   20#pragma once   21   22#include pulsar/c/message.h< [...]
+1   20#pragma once   21   22#include pulsar/c/message.h< [...]
 
 
 Generated by http://www.doxygen.org/index.html;>
diff --git a/content/api/cpp/producer_8h_source.html 
b/content/api/cpp/producer_8h_source.html
index 57bd8c1..7e6a50b 100644
--- a/content/api/cpp/producer_8h_source.html
+++ b/content/api/cpp/producer_8h_source.html
@@ -89,7 +89,7 @@ var searchBox = new SearchBox("searchBox", 
"search",false,'Search');
 producer.h  
 
 
-1   20#pragma once   21   22#ifdef __cplusplus< [...]
+1   20#pragma once   21   22#ifdef __cplusplus< [...]
 
 
 Generated by http://www.doxygen.org/index.html;>
diff --git a/content/api/cpp/producer__configuration_8h_source.html 
b/content/api/cpp/producer__configuration_8h_source.html
index 0135bd9..6c25bce 100644
--- a/content/api/cpp/producer__configuration_8h_source.html
+++ b/content/api/cpp/producer__configuration_8h_source.html
@@ -89,7 +89,7 @@ var searchBox = new SearchBox("searchBox", 
"search",false,'Search');
 producer_configuration.h  
 
 
-1   20#pragma once   21   22#include stdint.h1   20#pragma once   21   22#include stdint.h
 Generated by http://www.doxygen.org/index.html;>
diff --git a/content/api/cpp/reader_8h_source.html 

[GitHub] merlimat commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
merlimat commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187536043
 
 

 ##
 File path: pulsar-client-go/examples/reader/reader.go
 ##
 @@ -0,0 +1,48 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   reader, err := 
client.NewReader().Topic("my-topic").StartFromEarliest().Create()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   for {
+   msg, err := reader.ReadNext()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   fmt.Printf("Received message  msgId: %s -- content: '%s'\n",
+   msg.MessageId(), string(msg.Payload()))
+   }
 
 Review comment:
   No, the reader needs to be managing the "message-id" manually, outside the 
scope of Pulsar, so no acks are needed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
merlimat commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187535824
 
 

 ##
 File path: pulsar-client-go/examples/reader/reader.go
 ##
 @@ -0,0 +1,48 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   reader, err := 
client.NewReader().Topic("my-topic").StartFromEarliest().Create()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   for {
+   msg, err := reader.ReadNext()
 
 Review comment:
   This is mostly to differentiate the API semantic difference between the 
consumer (push) and reader (pull).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
merlimat commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187535910
 
 

 ##
 File path: pulsar-client-go/examples/reader/reader.go
 ##
 @@ -0,0 +1,48 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   reader, err := 
client.NewReader().Topic("my-topic").StartFromEarliest().Create()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   for {
+   msg, err := reader.ReadNext()
 
 Review comment:
   (the underlying implementation for the delivery is 99% the same, in any case)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
merlimat commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187535665
 
 

 ##
 File path: pulsar-client-go/examples/consumer/consumer.go
 ##
 @@ -0,0 +1,54 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   consumer, err := client.NewConsumer().
+   Topic("my-topic").
+   SubscriptionName("my-subscription").
+   SubscriptionType(pulsar.Shared).
+   Subscribe()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   for {
+   msg, err := consumer.Receive()
 
 Review comment:
   `Receive()` would block (on a channel) until a message is available. there's 
also `ReceiveWithTimeout()`. Is that what you are referring with " per-message 
timeout " ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #1766: Implementing PushSource on top of Source

2018-05-11 Thread GitBox
srkukarni commented on issue #1766:  Implementing PushSource on top of Source
URL: https://github.com/apache/incubator-pulsar/pull/1766#issuecomment-388279890
 
 
   Not really. Infact this is closer to Storm's KafkaSpout which is a pull model


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1766: Implementing PushSource on top of Source

2018-05-11 Thread GitBox
jerrypeng commented on issue #1766:  Implementing PushSource on top of Source
URL: https://github.com/apache/incubator-pulsar/pull/1766#issuecomment-388279505
 
 
   @srkukarni but isn't that just the same as using the PushSource but instead 
you a pulling the queuing logic back into the KafkaSource?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
merlimat commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187534640
 
 

 ##
 File path: pulsar-client-go/examples/consumer-listener/consumer-listener.go
 ##
 @@ -0,0 +1,55 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
 
 Review comment:
   yes, `URL` sounds better


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
merlimat commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187534501
 
 

 ##
 File path: pulsar-client-go/examples/consumer-listener/consumer-listener.go
 ##
 @@ -0,0 +1,55 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   channel := make(chan pulsar.ConsumerMessage)
+
+   consumer, err := client.NewConsumer().
 
 Review comment:
   Yes, I was trying to use the struct approach for config but I could not find 
a good way to deal with integer defaults.
   
   Example: producer configuration has a "send timeout" int option, for which a 
sensible default is 30 seconds. The problem is that the struct default of 0 is 
a valid value too, so I couldn't find a way to differentiate a legit 0 from an 
uninitiated option


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #1766: Implementing PushSource on top of Source

2018-05-11 Thread GitBox
srkukarni commented on issue #1766:  Implementing PushSource on top of Source
URL: https://github.com/apache/incubator-pulsar/pull/1766#issuecomment-388278263
 
 
   So the main interface is the Source<> interface which has a read() blocking 
call. Since kafka is also a pull interface, kafkasource can just basically 
extend source<> directly and implement read() as kafkaclient.poll(). The 
mechanics of internal buffering is for efficiency reasons, but it would be 
something like this
   read() {
   if bufferedQueue.empty  bufferedQueue = kafkaclient.poll(some reasonable 
number); 
   pop and return back;
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
merlimat commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187534072
 
 

 ##
 File path: pulsar-client-go/examples/consumer-listener/consumer-listener.go
 ##
 @@ -0,0 +1,55 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   channel := make(chan pulsar.ConsumerMessage)
+
+   consumer, err := client.NewConsumer().
+   Topic("my-topic").
+   SubscriptionName("my-subscription").
+   SubscriptionType(pulsar.Shared).
+   MessageListener(channel).
+   Subscribe()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   // Receive messages from channel
+   for cm := range channel {
+   consumer := cm.Consumer
 
 Review comment:
   In this case we don't care about the consumer, because the message can only 
come from that same consumer. 
   
   However, it might be possible to use a single channel to receive messages 
from multiple consumers. That's the reason the channel contains a 
`ConsumerMessage` struct rather than a plain `Message`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
merlimat commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187533673
 
 

 ##
 File path: pulsar-client-go/examples/consumer-listener/consumer-listener.go
 ##
 @@ -0,0 +1,55 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   channel := make(chan pulsar.ConsumerMessage)
+
+   consumer, err := client.NewConsumer().
+   Topic("my-topic").
+   SubscriptionName("my-subscription").
+   SubscriptionType(pulsar.Shared).
+   MessageListener(channel).
 
 Review comment:
   Agree, I was using `MessageListener` since I was converting from Java 
interfaces, then I realized than using a channel was a better option in Go


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
merlimat commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187533434
 
 

 ##
 File path: pulsar-client-go/examples/consumer-listener/consumer-listener.go
 ##
 @@ -0,0 +1,55 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
+   "fmt"
+   "log"
+)
+
+func main() {
+   client := pulsar.NewClient().
+   ServiceUrl("pulsar://localhost:6650").Build()
+
+   channel := make(chan pulsar.ConsumerMessage)
+
+   consumer, err := client.NewConsumer().
+   Topic("my-topic").
+   SubscriptionName("my-subscription").
+   SubscriptionType(pulsar.Shared).
+   MessageListener(channel).
+   Subscribe()
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   // Receive messages from channel
+   for cm := range channel {
+   consumer := cm.Consumer
+   msg := cm.Message
+   fmt.Printf("Received message  msgId: %s -- content: '%s'\n",
+   msg.MessageId(), string(msg.Payload()))
+
+   consumer.Acknowledge(msg)
+   }
+
+   consumer.Close()
 
 Review comment:
   Yes, makes sense to always use `defer`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #1758: Windowing for Pulsar Functions

2018-05-11 Thread GitBox
srkukarni commented on issue #1758: Windowing for Pulsar Functions
URL: https://github.com/apache/incubator-pulsar/pull/1758#issuecomment-388277539
 
 
   Can we keep the simple Function(Collection<>) interface that we have and 
still implement WindowExecutor client side? That way in cmdfunctions(at client 
side), you would pass classname as WindowExecutor with the user's class as one 
of the parameter for this


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1764: WIP - Pulsar Go client library

2018-05-11 Thread GitBox
merlimat commented on a change in pull request #1764: WIP - Pulsar Go client 
library
URL: https://github.com/apache/incubator-pulsar/pull/1764#discussion_r187533355
 
 

 ##
 File path: pulsar-client-go/examples/consumer-listener/consumer-listener.go
 ##
 @@ -0,0 +1,55 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+   "../../pulsar"
 
 Review comment:
   Good point, should it point to fully qualified name here? like 
`github.com/apache/incubator-pulsar/pulsar-client-go/pulsar`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1758: Windowing for Pulsar Functions

2018-05-11 Thread GitBox
jerrypeng commented on issue #1758: Windowing for Pulsar Functions
URL: https://github.com/apache/incubator-pulsar/pull/1758#issuecomment-388275692
 
 
   @srkukarni  well I thought you didn't like that approach since 
WindowFunction would be an abstract class that users would extend.  It would 
simplify the server side.  This would become the same approach as PushSource in 
the other PR


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1766: Implementing PushSource on top of Source

2018-05-11 Thread GitBox
jerrypeng commented on issue #1766:  Implementing PushSource on top of Source
URL: https://github.com/apache/incubator-pulsar/pull/1766#issuecomment-388277056
 
 
   @srkukarni using a push source might be more efficient since not we are 
doing async reads in a sense.  Remember the actual source called synchronously. 
 It can only process on tuple at a time


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1766: Implementing PushSource on top of Source

2018-05-11 Thread GitBox
jerrypeng commented on issue #1766:  Implementing PushSource on top of Source
URL: https://github.com/apache/incubator-pulsar/pull/1766#issuecomment-388277056
 
 
   @srkukarni using a push source might be more efficient since not we are 
doing async reads in a sense.  Remember the actual source is synchronous.  It 
can only process on tuple at a time


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1766: Implementing PushSource on top of Source

2018-05-11 Thread GitBox
jerrypeng commented on issue #1766:  Implementing PushSource on top of Source
URL: https://github.com/apache/incubator-pulsar/pull/1766#issuecomment-388277056
 
 
   @srkukarni using a push source might be more efficient since not we are 
doing async reads in a sense.  Remember the actual source synchronous.  It can 
only process on tuple at a time


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1766: Implementing PushSource on top of Source

2018-05-11 Thread GitBox
jerrypeng commented on issue #1766:  Implementing PushSource on top of Source
URL: https://github.com/apache/incubator-pulsar/pull/1766#issuecomment-388277056
 
 
   @srkukarni user a push source might be more efficient since not we are doing 
async reads in a sense.  Remember the actual source synchronous.  It can only 
process on tuple at a time


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1766: Implementing PushSource on top of Source

2018-05-11 Thread GitBox
jerrypeng commented on issue #1766:  Implementing PushSource on top of Source
URL: https://github.com/apache/incubator-pulsar/pull/1766#issuecomment-388276631
 
 
   @srkukarni are you suggesting we poll one tuple at a time?
   
   e.g.
   
   onsumerRecords = consumer.poll(1);
   
   is that efficient?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1758: Windowing for Pulsar Functions

2018-05-11 Thread GitBox
jerrypeng commented on issue #1758: Windowing for Pulsar Functions
URL: https://github.com/apache/incubator-pulsar/pull/1758#issuecomment-388275692
 
 
   @srkukarni  well I thought you didn't like that approach since 
WindowFunction would be an abstract method that users would extend.  It would 
simplify the server side.  This would become the same approach as PushSource in 
the other PR


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #1766: Implementing PushSource on top of Source

2018-05-11 Thread GitBox
srkukarni commented on issue #1766:  Implementing PushSource on top of Source
URL: https://github.com/apache/incubator-pulsar/pull/1766#issuecomment-388275286
 
 
   Since this is a smallish change, would rather do it here itself? It actually 
simpifies the code of kafka source


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #1758: Windowing for Pulsar Functions

2018-05-11 Thread GitBox
srkukarni commented on issue #1758: Windowing for Pulsar Functions
URL: https://github.com/apache/incubator-pulsar/pull/1758#issuecomment-388275138
 
 
   One approach would be to make WindowExecutor function client side?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on a change in pull request #1758: Windowing for Pulsar Functions

2018-05-11 Thread GitBox
jerrypeng commented on a change in pull request #1758: Windowing for Pulsar 
Functions
URL: https://github.com/apache/incubator-pulsar/pull/1758#discussion_r187530968
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 ##
 @@ -61,8 +67,21 @@ public JavaInstance(InstanceConfig config, Object 
userClassObject,
 // create the functions
 if (userClassObject instanceof Function) {
 this.function = (Function) userClassObject;
-} else {
-this.javaUtilFunction = (java.util.function.Function) 
userClassObject;
+} else if (userClassObject instanceof java.util.function.Function) {
+Class[] typeArgs = TypeResolver.resolveRawArguments(
+java.util.function.Function.class, 
userClassObject.getClass());
+// check if window function
+if (typeArgs[0].equals(Collection.class)) {
 
 Review comment:
   though its a bit difficult to get the check if the windowconfig set since 
the windowconfig is client side and not on the server side


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on a change in pull request #1758: Windowing for Pulsar Functions

2018-05-11 Thread GitBox
jerrypeng commented on a change in pull request #1758: Windowing for Pulsar 
Functions
URL: https://github.com/apache/incubator-pulsar/pull/1758#discussion_r187530714
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 ##
 @@ -61,8 +67,21 @@ public JavaInstance(InstanceConfig config, Object 
userClassObject,
 // create the functions
 if (userClassObject instanceof Function) {
 this.function = (Function) userClassObject;
-} else {
-this.javaUtilFunction = (java.util.function.Function) 
userClassObject;
+} else if (userClassObject instanceof java.util.function.Function) {
+Class[] typeArgs = TypeResolver.resolveRawArguments(
+java.util.function.Function.class, 
userClassObject.getClass());
+// check if window function
+if (typeArgs[0].equals(Collection.class)) {
 
 Review comment:
   yup good catch!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on a change in pull request #1766: Implementing PushSource on top of Source

2018-05-11 Thread GitBox
jerrypeng commented on a change in pull request #1766:  Implementing PushSource 
on top of Source
URL: https://github.com/apache/incubator-pulsar/pull/1766#discussion_r187530060
 
 

 ##
 File path: 
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java
 ##
 @@ -19,6 +19,7 @@
 
 package org.apache.pulsar.io.kafka;
 
+import lombok.Getter;
 
 Review comment:
   sure can we do that in a followup PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on a change in pull request #1766: Implementing PushSource on top of Source

2018-05-11 Thread GitBox
srkukarni commented on a change in pull request #1766:  Implementing PushSource 
on top of Source
URL: https://github.com/apache/incubator-pulsar/pull/1766#discussion_r187529796
 
 

 ##
 File path: 
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java
 ##
 @@ -19,6 +19,7 @@
 
 package org.apache.pulsar.io.kafka;
 
+import lombok.Getter;
 
 Review comment:
   kafka source is natively pull source. So we should actually implement it as 
source


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1767: check record returned by source is not null

2018-05-11 Thread GitBox
jerrypeng commented on issue #1767: check record returned by source is not null
URL: https://github.com/apache/incubator-pulsar/pull/1767#issuecomment-388272601
 
 
   @srkukarni @sijie please review


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng opened a new pull request #1767: check record returned by source is not null

2018-05-11 Thread GitBox
jerrypeng opened a new pull request #1767: check record returned by source is 
not null
URL: https://github.com/apache/incubator-pulsar/pull/1767
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1766: Implementing PushSource on top of Source

2018-05-11 Thread GitBox
jerrypeng commented on issue #1766:  Implementing PushSource on top of Source
URL: https://github.com/apache/incubator-pulsar/pull/1766#issuecomment-388271445
 
 
   @sijie @srkukarni please review


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng opened a new pull request #1766: Implementing PushSource on top of Source

2018-05-11 Thread GitBox
jerrypeng opened a new pull request #1766:  Implementing PushSource on top of 
Source
URL: https://github.com/apache/incubator-pulsar/pull/1766
 
 
   Changes:
   
   1. Modified the PushSource interface to change to an abstract function
   2. Change 
   
   setConsumer(Function consumer) 
   ->
   setConsumer(Consumer consumer)
   
   Since record already has a ack method to indicate acknowledgement. We don't 
need to completable future mechanism any more


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services