[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink
imaffe commented on a change in pull request #17452: URL: https://github.com/apache/flink/pull/17452#discussion_r806705822 ## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.serializer; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext; +import org.apache.flink.connector.pulsar.common.schema.PulsarSchema; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.common.schema.KeyValue; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * The serialization schema for how to serialize record into Pulsar. + * + * @param The message type send to Pulsar. + */ +@PublicEvolving +public interface PulsarSerializationSchema extends Serializable { + +/** + * Initialization method for the schema. It is called before the actual working methods {@link + * #serialize(Object, PulsarSinkContext)} and thus suitable for one time setup work. + * + * The provided {@link InitializationContext} can be used to access additional features such + * as e.g. registering user metrics. + * + * @param initializationContext Contextual information that can be used during initialization. + * @param sinkContext runtime information i.e. partitions, subtaskId + * @param sinkConfiguration All the configure options for Pulsar sink. You can add custom + * options. + */ +default void open( +InitializationContext initializationContext, +PulsarSinkContext sinkContext, +SinkConfiguration sinkConfiguration) +throws Exception { +// Nothing to do by default. +} + +/** + * Property {@link TypedMessageBuilder#key(String)}. + * + * @param element element to be serialized + * @param sinkContext context to provide extra information. + */ +default String key(IN element, PulsarSinkContext sinkContext) { +return null; +} + +/** + * Serializes given element into bytes. Property {@link TypedMessageBuilder#value(Object)}. + * + * @param element element to be serialized + * @param sinkContext context to provide extra information. + */ +byte[] serialize(IN element, PulsarSinkContext sinkContext); + +/** Setting message metadata which would rarely be used for normal users. */ +default void metadata(MetadataBuilder metadataBuilder, PulsarSinkContext sinkContext) { Review comment: Is it possible to pass ` element` as an argument in the `metadata(...)` method? Maybe there will be use cases where users want to calculate metadata from the element. Adding element here might give users more "food" to cook XD. WDYT~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink
imaffe commented on a change in pull request #17452: URL: https://github.com/apache/flink/pull/17452#discussion_r806705822 ## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.serializer; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext; +import org.apache.flink.connector.pulsar.common.schema.PulsarSchema; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.common.schema.KeyValue; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * The serialization schema for how to serialize record into Pulsar. + * + * @param The message type send to Pulsar. + */ +@PublicEvolving +public interface PulsarSerializationSchema extends Serializable { + +/** + * Initialization method for the schema. It is called before the actual working methods {@link + * #serialize(Object, PulsarSinkContext)} and thus suitable for one time setup work. + * + * The provided {@link InitializationContext} can be used to access additional features such + * as e.g. registering user metrics. + * + * @param initializationContext Contextual information that can be used during initialization. + * @param sinkContext runtime information i.e. partitions, subtaskId + * @param sinkConfiguration All the configure options for Pulsar sink. You can add custom + * options. + */ +default void open( +InitializationContext initializationContext, +PulsarSinkContext sinkContext, +SinkConfiguration sinkConfiguration) +throws Exception { +// Nothing to do by default. +} + +/** + * Property {@link TypedMessageBuilder#key(String)}. + * + * @param element element to be serialized + * @param sinkContext context to provide extra information. + */ +default String key(IN element, PulsarSinkContext sinkContext) { +return null; +} + +/** + * Serializes given element into bytes. Property {@link TypedMessageBuilder#value(Object)}. + * + * @param element element to be serialized + * @param sinkContext context to provide extra information. + */ +byte[] serialize(IN element, PulsarSinkContext sinkContext); + +/** Setting message metadata which would rarely be used for normal users. */ +default void metadata(MetadataBuilder metadataBuilder, PulsarSinkContext sinkContext) { Review comment: Is it possible to pass ` element` as an argument in the `metadata(...)` method? Maybe there will be use cases where users want to calculate metadata from the element. Adding element here might give users more "food" to cook XD. WDYT~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink
imaffe commented on a change in pull request #17452: URL: https://github.com/apache/flink/pull/17452#discussion_r800452443 ## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext; +import org.apache.flink.api.connector.sink2.Sink.InitContext; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter; +import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.function.SerializableFunction; + +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; + +import static java.util.Collections.emptyList; +import static org.apache.flink.util.IOUtils.closeAll; + +/** + * This class is responsible to write records in a Pulsar topic and to handle the different delivery + * {@link DeliveryGuarantee}s. + * + * @param The type of the input elements. + */ +public class PulsarWriter implements PrecommittingSinkWriter { +private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class); + +private final SinkConfiguration sinkConfiguration; +private final DeliveryGuarantee deliveryGuarantee; +private final PulsarSerializationSchema serializationSchema; +private final TopicRouter topicRouter; +private final PulsarSinkContextAdapter sinkContextAdapter; +private final TopicMetadataListener metadataListener; +private final MailboxExecutor mailboxExecutor; +private final TopicProducerRegister producerRegister; +private final Semaphore pendingMessages; + +/** + * Constructor creating a Pulsar writer. + * + * It will throw a {@link RuntimeException} if {@link + * PulsarSerializationSchema#open(InitializationContext, PulsarSinkContext, SinkConfiguration)} + * fails. + * + * @param sinkConfiguration the configuration to configure the Pulsar producer. + * @param serializationSchema serialize to transform the incoming records to {@link RawMessage}. + * @param metadataListener the listener for querying topic metadata. + * @param topicRouterProvider create related topic router to choose topic by incoming records. + * @param initContext context to provide information about the runtime environment. + */ +public PulsarWriter( +SinkConfiguration sinkConfiguration, +PulsarSerializationSchema serializationSchema, +TopicMetadataListener metadataListener, +SerializableFunction> topicRouterProvider, +InitContext initContext) { +this.sinkConfiguration = sinkConfiguration; +this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee(); +
[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink
imaffe commented on a change in pull request #17452: URL: https://github.com/apache/flink/pull/17452#discussion_r800452443 ## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext; +import org.apache.flink.api.connector.sink2.Sink.InitContext; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter; +import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.function.SerializableFunction; + +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; + +import static java.util.Collections.emptyList; +import static org.apache.flink.util.IOUtils.closeAll; + +/** + * This class is responsible to write records in a Pulsar topic and to handle the different delivery + * {@link DeliveryGuarantee}s. + * + * @param The type of the input elements. + */ +public class PulsarWriter implements PrecommittingSinkWriter { +private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class); + +private final SinkConfiguration sinkConfiguration; +private final DeliveryGuarantee deliveryGuarantee; +private final PulsarSerializationSchema serializationSchema; +private final TopicRouter topicRouter; +private final PulsarSinkContextAdapter sinkContextAdapter; +private final TopicMetadataListener metadataListener; +private final MailboxExecutor mailboxExecutor; +private final TopicProducerRegister producerRegister; +private final Semaphore pendingMessages; + +/** + * Constructor creating a Pulsar writer. + * + * It will throw a {@link RuntimeException} if {@link + * PulsarSerializationSchema#open(InitializationContext, PulsarSinkContext, SinkConfiguration)} + * fails. + * + * @param sinkConfiguration the configuration to configure the Pulsar producer. + * @param serializationSchema serialize to transform the incoming records to {@link RawMessage}. + * @param metadataListener the listener for querying topic metadata. + * @param topicRouterProvider create related topic router to choose topic by incoming records. + * @param initContext context to provide information about the runtime environment. + */ +public PulsarWriter( +SinkConfiguration sinkConfiguration, +PulsarSerializationSchema serializationSchema, +TopicMetadataListener metadataListener, +SerializableFunction> topicRouterProvider, +InitContext initContext) { +this.sinkConfiguration = sinkConfiguration; +this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee(); +
[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink
imaffe commented on a change in pull request #17452: URL: https://github.com/apache/flink/pull/17452#discussion_r793300636 ## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java ## @@ -97,16 +94,14 @@ * PulsarSourceBuilder}. */ PulsarSource( -Configuration configuration, +SourceConfiguration sourceConfiguration, Review comment: For source related changes, I guess the changes to the source is related to sink because we want to align the naming conventions and hierarchy , but maybe consider putting in a separate commit ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink
imaffe commented on a change in pull request #17452: URL: https://github.com/apache/flink/pull/17452#discussion_r793289032 ## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.serializer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.pulsar.common.schema.PulsarSchema; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; +import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage; + +import org.apache.pulsar.client.api.Schema; + +/** Wrap the Pulsar's Schema into PulsarSerializationSchema. */ +@Internal +public class PulsarSchemaWrapper implements PulsarSerializationSchema { +private static final long serialVersionUID = -2567052498398184194L; + +private static final byte[] EMPTY_BYTES = new byte[0]; +private final PulsarSchema pulsarSchema; + +public PulsarSchemaWrapper(PulsarSchema pulsarSchema) { +this.pulsarSchema = pulsarSchema; +} + +@Override +public RawMessage serialize(IN element, PulsarSinkContext sinkContext) { +RawMessage message; + +if (sinkContext.isEnableSchemaEvolution()) { +// We don't need to serialize incoming records in schema evolution. +message = new RawMessage<>(EMPTY_BYTES); +} else { +Schema schema = this.pulsarSchema.getPulsarSchema(); +byte[] bytes = schema.encode(element); +message = new RawMessage<>(bytes); +} + +Long eventTime = sinkContext.timestamp(); Review comment: Here we put the event time in when we do the serialization, wondering what is the design considerations here? Asking because I feel like adding event time can happen in a different step ## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.committer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.sink.PulsarSink; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; + +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.CoordinatorNotFoundException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static java.util.Collections.emptyList; +import static org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.createClient; +import s
[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink
imaffe commented on a change in pull request #17452: URL: https://github.com/apache/flink/pull/17452#discussion_r792717796 ## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.Sink.InitContext; +import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter; +import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchemaInitializationContext; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.function.SerializableFunction; + +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; + +import static java.util.Collections.emptyList; +import static org.apache.flink.util.IOUtils.closeAll; + +/** + * This class is responsible to write records in a Pulsar topic and to handle the different delivery + * {@link DeliveryGuarantee}s. + * + * @param The type of the input elements. + */ +@Internal +public class PulsarWriter implements SinkWriter { +private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class); + +private final SinkConfiguration sinkConfiguration; +private final DeliveryGuarantee deliveryGuarantee; +private final PulsarSerializationSchema serializationSchema; +private final TopicRouter topicRouter; +private final PulsarSinkContextAdapter sinkContextAdapter; +private final TopicMetadataListener metadataListener; +private final MailboxExecutor mailboxExecutor; +private final TopicProducerRegister producerRegister; +private final Semaphore pendingMessages; + +/** + * Constructor creating a Pulsar writer. + * + * It will throw a {@link RuntimeException} if {@link + * PulsarSerializationSchema#open(SerializationSchema.InitializationContext, PulsarSinkContext, + * SinkConfiguration)} fails. + * + * @param sinkConfiguration the configuration to configure the Pulsar producer. + * @param deliveryGuarantee the Sink's delivery guarantee. + * @param serializationSchema serialize to transform the incoming records to {@link RawMessage}. + * @param metadataListener the listener for querying topic metadata. + * @param topicRouterProvider create related topic router to choose topic by incoming records. + * @param initContext context to provide information about the runtime environment. + */ +public PulsarWriter( +SinkConfiguration sinkConfiguration, +DeliveryGuarantee deliveryGuarantee, +PulsarSerializationSchema serializationSchema, +TopicMetadataListener metadataListener, +SerializableFu
[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink
imaffe commented on a change in pull request #17452: URL: https://github.com/apache/flink/pull/17452#discussion_r792420132 ## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java ## @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder; +import org.apache.flink.connector.pulsar.common.config.PulsarOptions; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener; + +import org.apache.pulsar.client.api.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_TRANSACTION_TIMEOUT; +import static org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.SINK_CONFIG_VALIDATOR; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.distinctTopics; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The builder class for {@link PulsarSink} to make it easier for the users to construct a {@link + * PulsarSink}. + * + * The following example shows the minimum setup to create a PulsarSink that reads the String + * values from a Pulsar topic. + * + * {@code + * PulsarSink sink = PulsarSink.builder() + * .setServiceUrl(operator().serviceUrl()) + * .setAdminUrl(operator().adminUrl()) + * .setTopics(topic) + * .setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING)) + * .build(); + * } + * + * The service url, admin url, and the record serializer are required fields that must be set. If + * you don't set the topics, make sure you have provided a custom {@link TopicRouter}. Otherwise, + * you must provide the topics to produce. + * + * To specify the delivery guarantees of PulsarSink, one can call {@link + * #setDeliveryGuarantee(DeliveryGuarantee)}. The default value of the delivery guarantee is {@link + * DeliveryGuarantee#EXACTLY_ONCE}, and it requires the Pulsar broker to turn on transaction + * support. + * + * {@code + * PulsarSink sink = PulsarSink.builder() + * .setServiceUrl(operator().serviceUrl()) + * .setAdminUrl(operator().adminUrl()) + * .setTopics(topic) + * .setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING)) + * .setDeliveryGuarantee(deliveryGuarantee) + * .build(); + * } + * + * @see PulsarSink for a more detailed explanation of the different guarantees. + * @param The input type of the sink. + */ +@PublicEvolving +public class PulsarSinkBuilder { +private static final Logger LOG = LoggerFactory.getLogger(PulsarSinkBuilder.class); + +private final PulsarConfigBuilder configBuilder; + +private DeliveryGuarantee delive
[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink
imaffe commented on a change in pull request #17452: URL: https://github.com/apache/flink/pull/17452#discussion_r791349039 ## File path: flink-connectors/flink-connector-pulsar/pom.xml ## @@ -163,13 +202,22 @@ under the License. - + io.grpc grpc-bom - ${grpc.version} + ${pulsar-grpc.version} + pom + import Review comment: We use` import` scope to align grpc version to whatever pulsar's grpc version is, is my understanding correct? ## File path: flink-connectors/flink-connector-pulsar/pom.xml ## @@ -138,23 +140,60 @@ under the License. ${pulsar.version} test + org.apache.commons commons-lang3 - ${commons-lang3.version} + ${pulsar-commons-lang3.version} + test + + + + + + org.apache.zookeeper + zookeeper + ${pulsar-zookeeper.version} test - org.apache.pulsar pulsar-client-all ${pulsar.version} + + com.sun.activation Review comment: Wondering why we need to exclude these dependencies ? ## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfigUtils.java ## @@ -94,6 +100,11 @@ private PulsarConfigUtils() { public static PulsarClient createClient(Configuration configuration) { ClientBuilder builder = PulsarClient.builder(); +// requestTimeoutMs don't have a setter method on ClientBuilder. We have to use low level +// setter method instead. So we put this at the beginning of the builder. +Integer requestTimeoutMs = configuration.get(PULSAR_REQUEST_TIMEOUT_MS); +builder.loadConf(singletonMap("requestTimeoutMs", requestTimeoutMs)); Review comment: nit: just a reminder to add this to doc (if not already ~ ## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter; +import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter; +import org.apache.flink.connector.pulsar.sink.writer.PulsarWriterState; +import org.apache.flink.connector.pulsar.sink.writer.PulsarWriterStateSerializer; +import org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector; +import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** + * a pulsar Sink implement. + * + * @param record data type. + */ +@PublicEvolving +public class PulsarSink implements Sink { + +private final DeliveryGuarantee deliveryGuarantee; + +private final TopicSelector topicSelector; +
[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink
imaffe commented on a change in pull request #17452: URL: https://github.com/apache/flink/pull/17452#discussion_r791349039 ## File path: flink-connectors/flink-connector-pulsar/pom.xml ## @@ -163,13 +202,22 @@ under the License. - + io.grpc grpc-bom - ${grpc.version} + ${pulsar-grpc.version} + pom + import Review comment: We use` import` scope to align grpc version to whatever pulsar's grpc version is, is my understanding correct? ## File path: flink-connectors/flink-connector-pulsar/pom.xml ## @@ -138,23 +140,60 @@ under the License. ${pulsar.version} test + org.apache.commons commons-lang3 - ${commons-lang3.version} + ${pulsar-commons-lang3.version} + test + + + + + + org.apache.zookeeper + zookeeper + ${pulsar-zookeeper.version} test - org.apache.pulsar pulsar-client-all ${pulsar.version} + + com.sun.activation Review comment: Wondering why we need to exclude these dependencies ? ## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfigUtils.java ## @@ -94,6 +100,11 @@ private PulsarConfigUtils() { public static PulsarClient createClient(Configuration configuration) { ClientBuilder builder = PulsarClient.builder(); +// requestTimeoutMs don't have a setter method on ClientBuilder. We have to use low level +// setter method instead. So we put this at the beginning of the builder. +Integer requestTimeoutMs = configuration.get(PULSAR_REQUEST_TIMEOUT_MS); +builder.loadConf(singletonMap("requestTimeoutMs", requestTimeoutMs)); Review comment: nit: just a reminder to add this to doc (if not already ~ ## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter; +import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter; +import org.apache.flink.connector.pulsar.sink.writer.PulsarWriterState; +import org.apache.flink.connector.pulsar.sink.writer.PulsarWriterStateSerializer; +import org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector; +import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** + * a pulsar Sink implement. + * + * @param record data type. + */ +@PublicEvolving +public class PulsarSink implements Sink { + +private final DeliveryGuarantee deliveryGuarantee; + +private final TopicSelector topicSelector; +