[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink

2022-02-15 Thread GitBox


imaffe commented on a change in pull request #17452:
URL: https://github.com/apache/flink/pull/17452#discussion_r806705822



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.serializer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.common.schema.KeyValue;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The serialization schema for how to serialize record into Pulsar.
+ *
+ * @param  The message type send to Pulsar.
+ */
+@PublicEvolving
+public interface PulsarSerializationSchema extends Serializable {
+
+/**
+ * Initialization method for the schema. It is called before the actual 
working methods {@link
+ * #serialize(Object, PulsarSinkContext)} and thus suitable for one time 
setup work.
+ *
+ * The provided {@link InitializationContext} can be used to access 
additional features such
+ * as e.g. registering user metrics.
+ *
+ * @param initializationContext Contextual information that can be used 
during initialization.
+ * @param sinkContext runtime information i.e. partitions, subtaskId
+ * @param sinkConfiguration All the configure options for Pulsar sink. You 
can add custom
+ * options.
+ */
+default void open(
+InitializationContext initializationContext,
+PulsarSinkContext sinkContext,
+SinkConfiguration sinkConfiguration)
+throws Exception {
+// Nothing to do by default.
+}
+
+/**
+ * Property {@link TypedMessageBuilder#key(String)}.
+ *
+ * @param element element to be serialized
+ * @param sinkContext context to provide extra information.
+ */
+default String key(IN element, PulsarSinkContext sinkContext) {
+return null;
+}
+
+/**
+ * Serializes given element into bytes. Property {@link 
TypedMessageBuilder#value(Object)}.
+ *
+ * @param element element to be serialized
+ * @param sinkContext context to provide extra information.
+ */
+byte[] serialize(IN element, PulsarSinkContext sinkContext);
+
+/** Setting message metadata which would rarely be used for normal users. 
*/
+default void metadata(MetadataBuilder metadataBuilder, 
PulsarSinkContext sinkContext) {

Review comment:
   Is it possible to pass ` element` as an argument in the 
`metadata(...)` method? Maybe there will be use cases where users want to 
calculate metadata from the element. Adding element here might give users more 
"food" to cook XD. WDYT~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink

2022-02-15 Thread GitBox


imaffe commented on a change in pull request #17452:
URL: https://github.com/apache/flink/pull/17452#discussion_r806705822



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.serializer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.common.schema.KeyValue;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The serialization schema for how to serialize record into Pulsar.
+ *
+ * @param  The message type send to Pulsar.
+ */
+@PublicEvolving
+public interface PulsarSerializationSchema extends Serializable {
+
+/**
+ * Initialization method for the schema. It is called before the actual 
working methods {@link
+ * #serialize(Object, PulsarSinkContext)} and thus suitable for one time 
setup work.
+ *
+ * The provided {@link InitializationContext} can be used to access 
additional features such
+ * as e.g. registering user metrics.
+ *
+ * @param initializationContext Contextual information that can be used 
during initialization.
+ * @param sinkContext runtime information i.e. partitions, subtaskId
+ * @param sinkConfiguration All the configure options for Pulsar sink. You 
can add custom
+ * options.
+ */
+default void open(
+InitializationContext initializationContext,
+PulsarSinkContext sinkContext,
+SinkConfiguration sinkConfiguration)
+throws Exception {
+// Nothing to do by default.
+}
+
+/**
+ * Property {@link TypedMessageBuilder#key(String)}.
+ *
+ * @param element element to be serialized
+ * @param sinkContext context to provide extra information.
+ */
+default String key(IN element, PulsarSinkContext sinkContext) {
+return null;
+}
+
+/**
+ * Serializes given element into bytes. Property {@link 
TypedMessageBuilder#value(Object)}.
+ *
+ * @param element element to be serialized
+ * @param sinkContext context to provide extra information.
+ */
+byte[] serialize(IN element, PulsarSinkContext sinkContext);
+
+/** Setting message metadata which would rarely be used for normal users. 
*/
+default void metadata(MetadataBuilder metadataBuilder, 
PulsarSinkContext sinkContext) {

Review comment:
   Is it possible to pass ` element` as an argument in the 
`metadata(...)` method? Maybe there will be use cases where users want to 
calculate metadata from the element. Adding element here might give users more 
"food" to cook XD. WDYT~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink

2022-02-07 Thread GitBox


imaffe commented on a change in pull request #17452:
URL: https://github.com/apache/flink/pull/17452#discussion_r800452443



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import 
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import 
org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.IOUtils.closeAll;
+
+/**
+ * This class is responsible to write records in a Pulsar topic and to handle 
the different delivery
+ * {@link DeliveryGuarantee}s.
+ *
+ * @param  The type of the input elements.
+ */
+public class PulsarWriter implements PrecommittingSinkWriter {
+private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+private final SinkConfiguration sinkConfiguration;
+private final DeliveryGuarantee deliveryGuarantee;
+private final PulsarSerializationSchema serializationSchema;
+private final TopicRouter topicRouter;
+private final PulsarSinkContextAdapter sinkContextAdapter;
+private final TopicMetadataListener metadataListener;
+private final MailboxExecutor mailboxExecutor;
+private final TopicProducerRegister producerRegister;
+private final Semaphore pendingMessages;
+
+/**
+ * Constructor creating a Pulsar writer.
+ *
+ * It will throw a {@link RuntimeException} if {@link
+ * PulsarSerializationSchema#open(InitializationContext, 
PulsarSinkContext, SinkConfiguration)}
+ * fails.
+ *
+ * @param sinkConfiguration the configuration to configure the Pulsar 
producer.
+ * @param serializationSchema serialize to transform the incoming records 
to {@link RawMessage}.
+ * @param metadataListener the listener for querying topic metadata.
+ * @param topicRouterProvider create related topic router to choose topic 
by incoming records.
+ * @param initContext context to provide information about the runtime 
environment.
+ */
+public PulsarWriter(
+SinkConfiguration sinkConfiguration,
+PulsarSerializationSchema serializationSchema,
+TopicMetadataListener metadataListener,
+SerializableFunction> 
topicRouterProvider,
+InitContext initContext) {
+this.sinkConfiguration = sinkConfiguration;
+this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
+

[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink

2022-02-07 Thread GitBox


imaffe commented on a change in pull request #17452:
URL: https://github.com/apache/flink/pull/17452#discussion_r800452443



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import 
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import 
org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.IOUtils.closeAll;
+
+/**
+ * This class is responsible to write records in a Pulsar topic and to handle 
the different delivery
+ * {@link DeliveryGuarantee}s.
+ *
+ * @param  The type of the input elements.
+ */
+public class PulsarWriter implements PrecommittingSinkWriter {
+private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+private final SinkConfiguration sinkConfiguration;
+private final DeliveryGuarantee deliveryGuarantee;
+private final PulsarSerializationSchema serializationSchema;
+private final TopicRouter topicRouter;
+private final PulsarSinkContextAdapter sinkContextAdapter;
+private final TopicMetadataListener metadataListener;
+private final MailboxExecutor mailboxExecutor;
+private final TopicProducerRegister producerRegister;
+private final Semaphore pendingMessages;
+
+/**
+ * Constructor creating a Pulsar writer.
+ *
+ * It will throw a {@link RuntimeException} if {@link
+ * PulsarSerializationSchema#open(InitializationContext, 
PulsarSinkContext, SinkConfiguration)}
+ * fails.
+ *
+ * @param sinkConfiguration the configuration to configure the Pulsar 
producer.
+ * @param serializationSchema serialize to transform the incoming records 
to {@link RawMessage}.
+ * @param metadataListener the listener for querying topic metadata.
+ * @param topicRouterProvider create related topic router to choose topic 
by incoming records.
+ * @param initContext context to provide information about the runtime 
environment.
+ */
+public PulsarWriter(
+SinkConfiguration sinkConfiguration,
+PulsarSerializationSchema serializationSchema,
+TopicMetadataListener metadataListener,
+SerializableFunction> 
topicRouterProvider,
+InitContext initContext) {
+this.sinkConfiguration = sinkConfiguration;
+this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
+

[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink

2022-01-26 Thread GitBox


imaffe commented on a change in pull request #17452:
URL: https://github.com/apache/flink/pull/17452#discussion_r793300636



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
##
@@ -97,16 +94,14 @@
  * PulsarSourceBuilder}.
  */
 PulsarSource(
-Configuration configuration,
+SourceConfiguration sourceConfiguration,

Review comment:
   For source related changes, I guess the changes to the source is related 
to sink because we want to align the naming conventions and hierarchy , but 
maybe consider putting in a separate commit ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink

2022-01-26 Thread GitBox


imaffe commented on a change in pull request #17452:
URL: https://github.com/apache/flink/pull/17452#discussion_r793289032



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.serializer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+
+import org.apache.pulsar.client.api.Schema;
+
+/** Wrap the Pulsar's Schema into PulsarSerializationSchema. */
+@Internal
+public class PulsarSchemaWrapper implements PulsarSerializationSchema {
+private static final long serialVersionUID = -2567052498398184194L;
+
+private static final byte[] EMPTY_BYTES = new byte[0];
+private final PulsarSchema pulsarSchema;
+
+public PulsarSchemaWrapper(PulsarSchema pulsarSchema) {
+this.pulsarSchema = pulsarSchema;
+}
+
+@Override
+public RawMessage serialize(IN element, PulsarSinkContext 
sinkContext) {
+RawMessage message;
+
+if (sinkContext.isEnableSchemaEvolution()) {
+// We don't need to serialize incoming records in schema evolution.
+message = new RawMessage<>(EMPTY_BYTES);
+} else {
+Schema schema = this.pulsarSchema.getPulsarSchema();
+byte[] bytes = schema.encode(element);
+message = new RawMessage<>(bytes);
+}
+
+Long eventTime = sinkContext.timestamp();

Review comment:
   Here we put the event time in when we do the serialization,  wondering 
what is the design considerations here?  Asking because I feel like adding 
event time can happen in a different step

##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.PulsarSink;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.CoordinatorNotFoundException;
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Collections.emptyList;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.createClient;
+import s

[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink

2022-01-26 Thread GitBox


imaffe commented on a change in pull request #17452:
URL: https://github.com/apache/flink/pull/17452#discussion_r792717796



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Sink.InitContext;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import 
org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchemaInitializationContext;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.IOUtils.closeAll;
+
+/**
+ * This class is responsible to write records in a Pulsar topic and to handle 
the different delivery
+ * {@link DeliveryGuarantee}s.
+ *
+ * @param  The type of the input elements.
+ */
+@Internal
+public class PulsarWriter implements SinkWriter {
+private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+private final SinkConfiguration sinkConfiguration;
+private final DeliveryGuarantee deliveryGuarantee;
+private final PulsarSerializationSchema serializationSchema;
+private final TopicRouter topicRouter;
+private final PulsarSinkContextAdapter sinkContextAdapter;
+private final TopicMetadataListener metadataListener;
+private final MailboxExecutor mailboxExecutor;
+private final TopicProducerRegister producerRegister;
+private final Semaphore pendingMessages;
+
+/**
+ * Constructor creating a Pulsar writer.
+ *
+ * It will throw a {@link RuntimeException} if {@link
+ * 
PulsarSerializationSchema#open(SerializationSchema.InitializationContext, 
PulsarSinkContext,
+ * SinkConfiguration)} fails.
+ *
+ * @param sinkConfiguration the configuration to configure the Pulsar 
producer.
+ * @param deliveryGuarantee the Sink's delivery guarantee.
+ * @param serializationSchema serialize to transform the incoming records 
to {@link RawMessage}.
+ * @param metadataListener the listener for querying topic metadata.
+ * @param topicRouterProvider create related topic router to choose topic 
by incoming records.
+ * @param initContext context to provide information about the runtime 
environment.
+ */
+public PulsarWriter(
+SinkConfiguration sinkConfiguration,
+DeliveryGuarantee deliveryGuarantee,
+PulsarSerializationSchema serializationSchema,
+TopicMetadataListener metadataListener,
+SerializableFu

[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink

2022-01-26 Thread GitBox


imaffe commented on a change in pull request #17452:
URL: https://github.com/apache/flink/pull/17452#discussion_r792420132



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
##
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder;
+import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+
+import org.apache.pulsar.client.api.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_TRANSACTION_TIMEOUT;
+import static 
org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.SINK_CONFIG_VALIDATOR;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.distinctTopics;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The builder class for {@link PulsarSink} to make it easier for the users to 
construct a {@link
+ * PulsarSink}.
+ *
+ * The following example shows the minimum setup to create a PulsarSink 
that reads the String
+ * values from a Pulsar topic.
+ *
+ * {@code
+ * PulsarSink sink = PulsarSink.builder()
+ * .setServiceUrl(operator().serviceUrl())
+ * .setAdminUrl(operator().adminUrl())
+ * .setTopics(topic)
+ * 
.setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING))
+ * .build();
+ * }
+ *
+ * The service url, admin url, and the record serializer are required 
fields that must be set. If
+ * you don't set the topics, make sure you have provided a custom {@link 
TopicRouter}. Otherwise,
+ * you must provide the topics to produce.
+ *
+ * To specify the delivery guarantees of PulsarSink, one can call {@link
+ * #setDeliveryGuarantee(DeliveryGuarantee)}. The default value of the 
delivery guarantee is {@link
+ * DeliveryGuarantee#EXACTLY_ONCE}, and it requires the Pulsar broker to turn 
on transaction
+ * support.
+ *
+ * {@code
+ * PulsarSink sink = PulsarSink.builder()
+ * .setServiceUrl(operator().serviceUrl())
+ * .setAdminUrl(operator().adminUrl())
+ * .setTopics(topic)
+ * 
.setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING))
+ * .setDeliveryGuarantee(deliveryGuarantee)
+ * .build();
+ * }
+ *
+ * @see PulsarSink for a more detailed explanation of the different guarantees.
+ * @param  The input type of the sink.
+ */
+@PublicEvolving
+public class PulsarSinkBuilder {
+private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSinkBuilder.class);
+
+private final PulsarConfigBuilder configBuilder;
+
+private DeliveryGuarantee delive

[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink

2022-01-25 Thread GitBox


imaffe commented on a change in pull request #17452:
URL: https://github.com/apache/flink/pull/17452#discussion_r791349039



##
File path: flink-connectors/flink-connector-pulsar/pom.xml
##
@@ -163,13 +202,22 @@ under the License.


 
-   


+   

io.grpc
grpc-bom
-   ${grpc.version}
+   ${pulsar-grpc.version}
+   pom
+   import

Review comment:
   We use` import` scope to align grpc version to whatever pulsar's grpc 
version is, is my understanding correct?

##
File path: flink-connectors/flink-connector-pulsar/pom.xml
##
@@ -138,23 +140,60 @@ under the License.
${pulsar.version}
test

+



org.apache.commons
commons-lang3
-   ${commons-lang3.version}
+   ${pulsar-commons-lang3.version}
+   test
+   
+
+   
+   
+   
+   org.apache.zookeeper
+   zookeeper
+   ${pulsar-zookeeper.version}
test

 


-

org.apache.pulsar
pulsar-client-all
${pulsar.version}

+   
+   com.sun.activation

Review comment:
   Wondering why we need to exclude these dependencies ?

##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfigUtils.java
##
@@ -94,6 +100,11 @@ private PulsarConfigUtils() {
 public static PulsarClient createClient(Configuration configuration) {
 ClientBuilder builder = PulsarClient.builder();
 
+// requestTimeoutMs don't have a setter method on ClientBuilder. We 
have to use low level
+// setter method instead. So we put this at the beginning of the 
builder.
+Integer requestTimeoutMs = 
configuration.get(PULSAR_REQUEST_TIMEOUT_MS);
+builder.loadConf(singletonMap("requestTimeoutMs", requestTimeoutMs));

Review comment:
   nit: just a reminder to add this to doc (if not already ~ 

##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
##
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter;
+import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter;
+import org.apache.flink.connector.pulsar.sink.writer.PulsarWriterState;
+import 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriterStateSerializer;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * a pulsar Sink implement.
+ *
+ * @param  record data type.
+ */
+@PublicEvolving
+public class PulsarSink implements Sink {
+
+private final DeliveryGuarantee deliveryGuarantee;
+
+private final TopicSelector topicSelector;
+

[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink

2022-01-24 Thread GitBox


imaffe commented on a change in pull request #17452:
URL: https://github.com/apache/flink/pull/17452#discussion_r791349039



##
File path: flink-connectors/flink-connector-pulsar/pom.xml
##
@@ -163,13 +202,22 @@ under the License.


 
-   


+   

io.grpc
grpc-bom
-   ${grpc.version}
+   ${pulsar-grpc.version}
+   pom
+   import

Review comment:
   We use` import` scope to align grpc version to whatever pulsar's grpc 
version is, is my understanding correct?

##
File path: flink-connectors/flink-connector-pulsar/pom.xml
##
@@ -138,23 +140,60 @@ under the License.
${pulsar.version}
test

+



org.apache.commons
commons-lang3
-   ${commons-lang3.version}
+   ${pulsar-commons-lang3.version}
+   test
+   
+
+   
+   
+   
+   org.apache.zookeeper
+   zookeeper
+   ${pulsar-zookeeper.version}
test

 


-

org.apache.pulsar
pulsar-client-all
${pulsar.version}

+   
+   com.sun.activation

Review comment:
   Wondering why we need to exclude these dependencies ?

##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfigUtils.java
##
@@ -94,6 +100,11 @@ private PulsarConfigUtils() {
 public static PulsarClient createClient(Configuration configuration) {
 ClientBuilder builder = PulsarClient.builder();
 
+// requestTimeoutMs don't have a setter method on ClientBuilder. We 
have to use low level
+// setter method instead. So we put this at the beginning of the 
builder.
+Integer requestTimeoutMs = 
configuration.get(PULSAR_REQUEST_TIMEOUT_MS);
+builder.loadConf(singletonMap("requestTimeoutMs", requestTimeoutMs));

Review comment:
   nit: just a reminder to add this to doc (if not already ~ 

##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
##
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter;
+import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter;
+import org.apache.flink.connector.pulsar.sink.writer.PulsarWriterState;
+import 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriterStateSerializer;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * a pulsar Sink implement.
+ *
+ * @param  record data type.
+ */
+@PublicEvolving
+public class PulsarSink implements Sink {
+
+private final DeliveryGuarantee deliveryGuarantee;
+
+private final TopicSelector topicSelector;
+