This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ad03a07  Add pulsar flink sink connector (#2434)
ad03a07 is described below

commit ad03a07d5f5d93654f1d5afb8694989db5688095
Author: Ali Ahmed <alahmed...@gmail.com>
AuthorDate: Tue Sep 4 11:59:03 2018 -0700

    Add pulsar flink sink connector (#2434)
    
    @XiaoZYang introduces a module for a pulsar sink connector for flink. This 
PR is moving the work from flink repo to pulsar repo, so the flink connector 
can be released faster along with Pulsar releases.
    
    Original Flink Github Issue: apache/flink#5845
    Jira Issue: https://issues.apache.org/jira/browse/FLINK-9168
    
    Original Author: @XiaoZYang (Zong Yang Xiao)
---
 pom.xml                                            |   3 +
 pulsar-flink/pom.xml                               | 137 +++++++++
 .../connectors/pulsar/FlinkPulsarProducer.java     | 314 +++++++++++++++++++++
 .../connectors/pulsar/PulsarJsonTableSink.java     |  61 ++++
 .../connectors/pulsar/PulsarProduceMode.java       |  37 +++
 .../connectors/pulsar/PulsarTableSink.java         | 163 +++++++++++
 .../pulsar/partitioner/PulsarKeyExtractor.java     |  36 +++
 .../pulsar/serde/JsonRowDeserializationSchema.java | 130 +++++++++
 .../pulsar/serde/JsonRowSerializationSchema.java   |  92 ++++++
 9 files changed, 973 insertions(+)

diff --git a/pom.xml b/pom.xml
index 4bb3cf6..1ed2802 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,6 +92,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <module>pulsar-proxy</module>
     <module>pulsar-discovery-service</module>
     <module>pulsar-storm</module>
+    <module>pulsar-flink</module>
     <module>pulsar-spark</module>
     <module>pulsar-zookeeper-utils</module>
     <module>pulsar-testclient</module>
@@ -167,6 +168,8 @@ flexible messaging model and an intuitive client 
API.</description>
     <avro.version>1.8.2</avro.version>
     <jclouds.version>2.1.1</jclouds.version>
     <presto.version>0.206</presto.version>
+    <flink.version>1.6.0</flink.version>
+    <scala.binary.version>2.11</scala.binary.version>
 
     <!-- test dependencies -->
     <arquillian-cube.version>1.15.1</arquillian-cube.version>
diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml
new file mode 100644
index 0000000..97ab4fb
--- /dev/null
+++ b/pulsar-flink/pom.xml
@@ -0,0 +1,137 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+  xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar</artifactId>
+    <version>2.2.0-incubating-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>pulsar-flink</artifactId>
+  <name>Pulsar Flink Connectors</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+      <!-- Projects depending on this project, won't depend on flink-table. -->
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-tests_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.javassist</groupId>
+      <artifactId>javassist</artifactId>
+      <version>3.20.0-GA</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+        <filtering>true</filtering>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <createDependencyReducedPom>true</createDependencyReducedPom>
+              
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+              <artifactSet>
+                <includes>
+                  <include>com.google.guava:guava</include>
+                </includes>
+              </artifactSet>
+              <relocations>
+                <relocation>
+                  <pattern>com.google</pattern>
+                  <shadedPattern>pulsar-flink-shade.com.google</shadedPattern>
+                </relocation>
+              </relocations>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
new file mode 100644
index 0000000..bddfee4
--- /dev/null
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.util.SerializableObject;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Flink Sink to produce data into a Pulsar topic.
+ */
+public class FlinkPulsarProducer<IN>
+        extends RichSinkFunction<IN>
+        implements CheckpointedFunction {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPulsarProducer.class);
+
+    /**
+     * The pulsar service url.
+     */
+    protected final String serviceUrl;
+
+    /**
+     * User defined configuration for the producer.
+     */
+    protected final ProducerConfiguration producerConfig;
+
+    /**
+     * The name of the default topic this producer is writing data to.
+     */
+    protected final String defaultTopicName;
+
+    /**
+     * (Serializable) SerializationSchema for turning objects used with Flink 
into.
+     * byte[] for Pulsar.
+     */
+    protected final SerializationSchema<IN> schema;
+
+    /**
+     * User-provided key extractor for assigning a key to a pulsar message.
+     */
+    protected final PulsarKeyExtractor<IN> flinkPulsarKeyExtractor;
+
+    /**
+     * Produce Mode.
+     */
+    protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONE;
+
+    /**
+     * If true, the producer will wait until all outstanding records have been 
send to the broker.
+     */
+    protected boolean flushOnCheckpoint;
+
+    // -------------------------------- Runtime fields 
------------------------------------------
+
+    /**
+     * Pulsar Producer instance.
+     */
+    protected transient Producer producer;
+
+    /**
+     * The callback than handles error propagation or logging callbacks.
+     */
+    protected transient Function<MessageId, MessageId> successCallback = msgId 
-> {
+        acknowledgeMessage();
+        return msgId;
+    };
+
+    protected transient Function<Throwable, MessageId> failureCallback;
+
+    /**
+     * Errors encountered in the async producer are stored here.
+     */
+    protected transient volatile Exception asyncException;
+
+    /**
+     * Lock for accessing the pending records.
+     */
+    protected final SerializableObject pendingRecordsLock = new 
SerializableObject();
+
+    /**
+     * Number of unacknowledged records.
+     */
+    protected long pendingRecords;
+
+    public FlinkPulsarProducer(String serviceUrl,
+                               String defaultTopicName,
+                               SerializationSchema<IN> serializationSchema,
+                               ProducerConfiguration producerConfig,
+                               PulsarKeyExtractor<IN> keyExtractor) {
+        this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
+        this.defaultTopicName = checkNotNull(defaultTopicName, "TopicName not 
set");
+        this.schema = checkNotNull(serializationSchema, "Serialization Schema 
not set");
+        this.producerConfig = checkNotNull(producerConfig, "Producer Config is 
not set");
+        this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
+        ClosureCleaner.ensureSerializable(serializationSchema);
+    }
+
+    // ---------------------------------- Properties --------------------------
+
+
+    /**
+     * @return pulsar key extractor.
+     */
+    public PulsarKeyExtractor<IN> getKeyExtractor() {
+        return flinkPulsarKeyExtractor;
+    }
+
+    /**
+     * Gets this producer's operating mode.
+     */
+    public PulsarProduceMode getProduceMode() {
+        return this.produceMode;
+    }
+
+    /**
+     * Sets this producer's operating mode.
+     *
+     * @param produceMode The mode of operation.
+     */
+    public void setProduceMode(PulsarProduceMode produceMode) {
+        this.produceMode = checkNotNull(produceMode);
+    }
+
+    /**
+     * If set to true, the Flink producer will wait for all outstanding 
messages in the Pulsar buffers
+     * to be acknowledged by the Pulsar producer on a checkpoint.
+     * This way, the producer can guarantee that messages in the Pulsar 
buffers are part of the checkpoint.
+     *
+     * @param flush Flag indicating the flushing mode (true = flush on 
checkpoint)
+     */
+    public void setFlushOnCheckpoint(boolean flush) {
+        this.flushOnCheckpoint = flush;
+    }
+
+    // ----------------------------------- Sink Methods 
--------------------------
+
+    @SuppressWarnings("unchecked")
+    private static final <T> PulsarKeyExtractor<T> 
getOrNullKeyExtractor(PulsarKeyExtractor<T> extractor) {
+        if (null == extractor) {
+            return PulsarKeyExtractor.NULL;
+        } else {
+            return extractor;
+        }
+    }
+
+    private Producer createProducer(ProducerConfiguration configuration) 
throws Exception {
+        PulsarClient client = PulsarClient.create(serviceUrl);
+        return client.createProducer(defaultTopicName, configuration);
+    }
+
+    /**
+     * Initializes the connection to pulsar.
+     *
+     * @param parameters configuration used for initialization
+     * @throws Exception
+     */
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        this.producer = createProducer(producerConfig);
+
+        RuntimeContext ctx = getRuntimeContext();
+
+        LOG.info("Starting FlinkPulsarProducer ({}/{}) to produce into pulsar 
topic {}",
+                ctx.getIndexOfThisSubtask() + 1, 
ctx.getNumberOfParallelSubtasks(), defaultTopicName);
+
+        if (flushOnCheckpoint && !((StreamingRuntimeContext) 
this.getRuntimeContext()).isCheckpointingEnabled()) {
+            LOG.warn("Flushing on checkpoint is enabled, but checkpointing is 
not enabled. Disabling flushing.");
+            flushOnCheckpoint = false;
+        }
+
+        if (PulsarProduceMode.AT_MOST_ONCE == produceMode) {
+            this.failureCallback = cause -> {
+                LOG.error("Error while sending record to Pulsar : " + 
cause.getMessage(), cause);
+                return null;
+            };
+        } else if (PulsarProduceMode.AT_LEAST_ONE == produceMode) {
+            this.failureCallback = cause -> {
+                if (null == asyncException) {
+                    if (cause instanceof Exception) {
+                        asyncException = (Exception) cause;
+                    } else {
+                        asyncException = new Exception(cause);
+                    }
+                }
+                return null;
+            };
+        } else {
+            throw new UnsupportedOperationException("Unsupported produce mode 
" + produceMode);
+        }
+    }
+
+    @Override
+    public void invoke(IN value, Context context) throws Exception {
+        checkErroneous();
+
+        byte[] serializedValue = schema.serialize(value);
+
+        MessageBuilder msgBuilder = MessageBuilder.create();
+        if (null != context.timestamp()) {
+            msgBuilder = msgBuilder.setEventTime(context.timestamp());
+        }
+        String msgKey = flinkPulsarKeyExtractor.getKey(value);
+        if (null != msgKey) {
+            msgBuilder = msgBuilder.setKey(msgKey);
+        }
+        Message message = msgBuilder
+                .setContent(serializedValue)
+                .build();
+
+        if (flushOnCheckpoint) {
+            synchronized (pendingRecordsLock) {
+                pendingRecords++;
+            }
+        }
+        producer.sendAsync(message)
+                .thenApply(successCallback)
+                .exceptionally(failureCallback);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (producer != null) {
+            producer.close();
+        }
+
+        // make sure we propagate pending errors
+        checkErroneous();
+    }
+
+    // ------------------- Logic for handling checkpoint flushing 
-------------------------- //
+
+    private void acknowledgeMessage() {
+        if (flushOnCheckpoint) {
+            synchronized (pendingRecordsLock) {
+                pendingRecords--;
+                if (pendingRecords == 0) {
+                    pendingRecordsLock.notifyAll();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        // check for asynchronous errors and fail the checkpoint if necessary
+        checkErroneous();
+
+        if (flushOnCheckpoint) {
+            // wait until all the messages are acknowledged
+            synchronized (pendingRecordsLock) {
+                while (pendingRecords > 0) {
+                    pendingRecordsLock.wait(100);
+                }
+            }
+
+            // if the flushed requests has errors, we should propagate it also 
and fail the checkpoint
+            checkErroneous();
+        }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        // nothing to do
+    }
+
+    // ----------------------------------- Utilities --------------------------
+
+    protected void checkErroneous() throws Exception {
+        Exception e = asyncException;
+        if (e != null) {
+            // prevent double throwing
+            asyncException = null;
+            throw new Exception("Failed to send data to Kafka: " + 
e.getMessage(), e);
+        }
+    }
+
+}
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
new file mode 100644
index 0000000..1a8d5e3
--- /dev/null
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.connectors.pulsar.serde.JsonRowSerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+
+/**
+ * Base class for {@link PulsarTableSink} that serializes data in JSON format.
+ */
+public class PulsarJsonTableSink extends PulsarTableSink {
+
+    /**
+     * Create PulsarJsonTableSink.
+     *
+     * @param serviceUrl          pulsar service url
+     * @param topic               topic in pulsar to which table is written
+     * @param producerConf        producer configuration
+     * @param routingKeyFieldName routing key field name
+     */
+    public PulsarJsonTableSink(
+            String serviceUrl,
+            String topic,
+            ProducerConfiguration producerConf,
+            String routingKeyFieldName) {
+        super(serviceUrl, topic, producerConf, routingKeyFieldName);
+    }
+
+    @Override
+    protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo 
rowSchema) {
+        return new JsonRowSerializationSchema(rowSchema);
+    }
+
+    @Override
+    protected PulsarTableSink createSink() {
+        return new PulsarJsonTableSink(
+                serviceUrl,
+                topic,
+                producerConf,
+                routingKeyFieldName);
+    }
+}
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
new file mode 100644
index 0000000..d1b9fd8
--- /dev/null
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+/**
+ * The supported producing modes of operation for flink's pulsar producer.
+ */
+public enum PulsarProduceMode {
+
+    /**
+     * Any produce failures will be ignored hence there could be data loss.
+     */
+    AT_MOST_ONCE,
+
+    /**
+     * The producer will ensure that all the events are persisted in pulsar.
+     * There could be duplicate events written though.
+     */
+    AT_LEAST_ONE,
+
+}
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
new file mode 100644
index 0000000..faf420f
--- /dev/null
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An append-only table sink to emit a streaming table as a Pulsar stream.
+ */
+public abstract class PulsarTableSink implements AppendStreamTableSink<Row> {
+
+    protected final String serviceUrl;
+    protected final String topic;
+    protected final ProducerConfiguration producerConf;
+    protected SerializationSchema<Row> serializationSchema;
+    protected PulsarKeyExtractor<Row> keyExtractor;
+    protected String[] fieldNames;
+    protected TypeInformation[] fieldTypes;
+    protected final String routingKeyFieldName;
+
+    public PulsarTableSink(
+            String serviceUrl,
+            String topic,
+            ProducerConfiguration producerConf,
+            String routingKeyFieldName) {
+        this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
+        this.topic = checkNotNull(topic, "Topic is null");
+        this.producerConf = checkNotNull(producerConf, "Producer configuration 
not set");
+        this.routingKeyFieldName = routingKeyFieldName;
+    }
+
+    /**
+     * Create serialization schema for converting table rows into bytes.
+     *
+     * @param rowSchema the schema of the row to serialize.
+     * @return Instance of serialization schema
+     */
+    protected abstract SerializationSchema<Row> 
createSerializationSchema(RowTypeInfo rowSchema);
+
+    /**
+     * Create a deep copy of this sink.
+     *
+     * @return Deep copy of this sink
+     */
+    protected abstract PulsarTableSink createSink();
+
+    /**
+     * Returns the low-level producer.
+     */
+    protected FlinkPulsarProducer<Row> createFlinkPulsarProducer() {
+        return new FlinkPulsarProducer<Row>(
+                serviceUrl,
+                topic,
+                serializationSchema,
+                producerConf,
+                keyExtractor);
+    }
+
+    @Override
+    public void emitDataStream(DataStream<Row> dataStream) {
+        checkState(fieldNames != null, "Table sink is not configured");
+        checkState(fieldTypes != null, "Table sink is not configured");
+        checkState(serializationSchema != null, "Table sink is not 
configured");
+        checkState(keyExtractor != null, "Table sink is not configured");
+
+        FlinkPulsarProducer<Row> producer = createFlinkPulsarProducer();
+        dataStream.addSink(producer);
+    }
+
+    @Override
+    public TypeInformation<Row> getOutputType() {
+        return new RowTypeInfo(fieldTypes, fieldNames);
+    }
+
+    @Override
+    public String[] getFieldNames() {
+        return fieldNames;
+    }
+
+    @Override
+    public TypeInformation<?>[] getFieldTypes() {
+        return fieldTypes;
+    }
+
+    @Override
+    public TableSink<Row> configure(String[] fieldNames,
+                                    TypeInformation<?>[] fieldTypes) {
+
+        PulsarTableSink sink = createSink();
+
+        sink.fieldNames = checkNotNull(fieldNames, "Field names are null");
+        sink.fieldTypes = checkNotNull(fieldTypes, "Field types are null");
+        checkArgument(fieldNames.length == fieldTypes.length,
+                "Number of provided field names and types do not match");
+
+        RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames);
+        sink.serializationSchema = createSerializationSchema(rowSchema);
+        sink.keyExtractor = new RowKeyExtractor(
+                routingKeyFieldName,
+                fieldNames,
+                fieldTypes);
+
+        return sink;
+    }
+
+    /**
+     * A key extractor that extracts the routing key from a {@link Row} by 
field name.
+     */
+    private static class RowKeyExtractor implements PulsarKeyExtractor<Row> {
+
+        private final int keyIndex;
+
+        public RowKeyExtractor(
+                String keyFieldName,
+                String[] fieldNames,
+                TypeInformation<?>[] fieldTypes) {
+            checkArgument(fieldNames.length == fieldTypes.length,
+                    "Number of provided field names and types does not 
match.");
+            int keyIndex = Arrays.asList(fieldNames).indexOf(keyFieldName);
+            checkArgument(keyIndex >= 0,
+                    "Key field '" + keyFieldName + "' not found");
+            checkArgument(Types.STRING.equals(fieldTypes[keyIndex]),
+                    "Key field must be of type 'STRING'");
+            this.keyIndex = keyIndex;
+        }
+
+        @Override
+        public String getKey(Row event) {
+            return (String) event.getField(keyIndex);
+        }
+    }
+}
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
new file mode 100644
index 0000000..90dc21c
--- /dev/null
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar.partitioner;
+
+/**
+ * Extract key from a value.
+ */
+public interface PulsarKeyExtractor<IN> {
+
+    PulsarKeyExtractor NULL = in -> null;
+
+    /**
+     * Retrieve a key from the value.
+     *
+     * @param in the value to extract a key.
+     * @return key.
+     */
+    String getKey(IN in);
+
+}
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java
 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java
new file mode 100644
index 0000000..dfc89b9
--- /dev/null
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar.serde;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Deserialization schema from JSON to {@link Row}.
+ *
+ * <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads
+ * the specified fields.
+ *
+ * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+public class JsonRowDeserializationSchema implements 
DeserializationSchema<Row> {
+
+    /**
+     * Type information describing the result type.
+     */
+    private final TypeInformation<Row> typeInfo;
+
+    /**
+     * Field names to parse. Indices match fieldTypes indices.
+     */
+    private final String[] fieldNames;
+
+    /**
+     * Types to parse fields as. Indices match fieldNames indices.
+     */
+    private final TypeInformation<?>[] fieldTypes;
+
+    /**
+     * Object mapper for parsing the JSON.
+     */
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * Flag indicating whether to fail on a missing field.
+     */
+    private boolean failOnMissingField;
+
+    /**
+     * Creates a JSON deserialization schema for the given fields and types.
+     *
+     * @param typeInfo Type information describing the result type. The field 
names are used
+     *                 to parse the JSON file and so are the types.
+     */
+    public JsonRowDeserializationSchema(TypeInformation<Row> typeInfo) {
+        Preconditions.checkNotNull(typeInfo, "Type information");
+        this.typeInfo = typeInfo;
+
+        this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
+        this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
+    }
+
+    @Override
+    public Row deserialize(byte[] message) throws IOException {
+        try {
+            JsonNode root = objectMapper.readTree(message);
+
+            Row row = new Row(fieldNames.length);
+            for (int i = 0; i < fieldNames.length; i++) {
+                JsonNode node = root.get(fieldNames[i]);
+
+                if (node == null) {
+                    if (failOnMissingField) {
+                        throw new IllegalStateException("Failed to find field 
with name '"
+                                + fieldNames[i] + "'.");
+                    } else {
+                        row.setField(i, null);
+                    }
+                } else {
+                    // Read the value as specified type
+                    Object value = objectMapper.treeToValue(node, 
fieldTypes[i].getTypeClass());
+                    row.setField(i, value);
+                }
+            }
+
+            return row;
+        } catch (Throwable t) {
+            throw new IOException("Failed to deserialize JSON object.", t);
+        }
+    }
+
+    @Override
+    public boolean isEndOfStream(Row nextElement) {
+        return false;
+    }
+
+    @Override
+    public TypeInformation<Row> getProducedType() {
+        return typeInfo;
+    }
+
+    /**
+     * Configures the failure behaviour if a JSON field is missing.
+     *
+     * <p>By default, a missing field is ignored and the field is set to null.
+     *
+     * @param failOnMissingField Flag indicating whether to fail or not on a 
missing field.
+     */
+    public void setFailOnMissingField(boolean failOnMissingField) {
+        this.failOnMissingField = failOnMissingField;
+    }
+
+}
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowSerializationSchema.java
 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowSerializationSchema.java
new file mode 100644
index 0000000..503f01e
--- /dev/null
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowSerializationSchema.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar.serde;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes an object into a JSON bytes.
+ *
+ * <p>Serializes the input {@link Row} object into a JSON string and
+ * converts it into <code>byte[]</code>.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using
+ * {@link JsonRowDeserializationSchema}.
+ */
+public class JsonRowSerializationSchema implements SerializationSchema<Row> {
+    /**
+     * Fields names in the input Row object.
+     */
+    private final String[] fieldNames;
+    /**
+     * Object mapper that is used to create output JSON objects.
+     */
+    private static ObjectMapper mapper = new ObjectMapper();
+
+    /**
+     * Creates a JSON serialization schema for the given fields and types.
+     *
+     * @param rowSchema The schema of the rows to encode.
+     */
+    public JsonRowSerializationSchema(RowTypeInfo rowSchema) {
+
+        Preconditions.checkNotNull(rowSchema);
+        String[] fieldNames = rowSchema.getFieldNames();
+        TypeInformation[] fieldTypes = rowSchema.getFieldTypes();
+
+        // check that no field is composite
+        for (int i = 0; i < fieldTypes.length; i++) {
+            if (fieldTypes[i] instanceof CompositeType) {
+                throw new IllegalArgumentException("JsonRowSerializationSchema 
cannot encode rows with nested schema, " +
+                        "but field '" + fieldNames[i] + "' is nested: " + 
fieldTypes[i].toString());
+            }
+        }
+
+        this.fieldNames = fieldNames;
+    }
+
+    @Override
+    public byte[] serialize(Row row) {
+        if (row.getArity() != fieldNames.length) {
+            throw new IllegalStateException(String.format(
+                    "Number of elements in the row %s is different from number 
of field names: %d", row, fieldNames.length));
+        }
+
+        ObjectNode objectNode = mapper.createObjectNode();
+
+        for (int i = 0; i < row.getArity(); i++) {
+            JsonNode node = mapper.valueToTree(row.getField(i));
+            objectNode.set(fieldNames[i], node);
+        }
+
+        try {
+            return mapper.writeValueAsBytes(objectNode);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to serialize row", e);
+        }
+    }
+}

Reply via email to