[ 
https://issues.apache.org/jira/browse/BAHIR-99?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16097666#comment-16097666
 ] 

ASF GitHub Bot commented on BAHIR-99:
-------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/17#discussion_r128921756
  
    --- Diff: 
flink-connector-kudu/src/main/java/es/accenture/flink/Sink/KuduSink.java ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package es.accenture.flink.Sink;
    +
    +import es.accenture.flink.Utils.Exceptions.KuduClientException;
    +import es.accenture.flink.Utils.RowSerializable;
    +import es.accenture.flink.Utils.Utils;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.kudu.client.KuduTable;
    +import org.apache.log4j.Logger;
    +
    +import java.io.IOException;
    +
    +public class KuduSink extends RichSinkFunction<RowSerializable>{
    +
    +    private String host, tableName;
    +    private String [] fieldsNames;
    +    private transient Utils utils;
    +
    +    //Kudu variables
    +    private transient KuduTable table;
    +
    +    // LOG4J
    +
    +    private final static Logger logger = Logger.getLogger(KuduSink.class);
    +
    +    /**
    +     * Builder to use when you want to create a new table
    +     *
    +     * @param host          Kudu host
    +     * @param tableName     Kudu table name
    +     * @param fieldsNames   List of column names in the table to be created
    +     * @throws KuduClientException In case of exception caused by Kudu 
Client
    +     */
    +    public KuduSink (String host, String tableName, String [] fieldsNames) 
throws KuduClientException {
    +
    +        if (host == null || host.isEmpty()) {
    +            throw new IllegalArgumentException("ERROR: Param \"host\" not 
valid (null or empty)");
    +
    +        } else if (tableName == null || tableName.isEmpty()) {
    +            throw new IllegalArgumentException("ERROR: Param \"tableName\" 
not valid (null or empty)");
    +
    +        }
    +        this.host = host;
    +        this.tableName = tableName;
    +        this.fieldsNames = fieldsNames;
    +    }
    +
    +    /**
    +     * Builder to be used when using an existing table
    +     *
    +     * @param host          Kudu host
    +     * @param tableName     Kudu table name
    +     * @throws KuduClientException In case of exception caused by Kudu 
Client
    +     */
    +    public KuduSink (String host, String tableName) throws 
KuduClientException {
    +
    +        if (host == null || host.isEmpty()) {
    +            throw new IllegalArgumentException("ERROR: Param \"host\" not 
valid (null or empty)");
    +
    +        } else if (tableName == null || tableName.isEmpty()) {
    +            throw new IllegalArgumentException("ERROR: Param \"tableName\" 
not valid (null or empty)");
    +        }
    +
    +        this.host = host;
    +        this.tableName = tableName;
    +    }
    +
    +    /**
    +     * It's responsible to insert a row into the indicated table by the 
builder (Streaming)
    +     *
    +     * @param row   Data of a row to insert
    +     */
    +    @Override
    +    public void invoke(RowSerializable row) throws IOException {
    +
    +        // Establish connection with Kudu
    +        if (this.utils == null)
    +            this.utils = new Utils(host);
    +
    +        if (this.table == null)
    +            this.table = this.utils.useTable(tableName, fieldsNames, row);
    +
    +
    +        // Make the insert into the table
    +        utils.insert(table, row, fieldsNames);
    +
    +        logger.info("Inserted the Row: | " + utils.printRow(row) + "at the 
table \"" + this.tableName + "\"");
    --- End diff --
    
    We suggest to use parameterized messages in log4j for performance reasons: 
https://logging.apache.org/log4j/2.0/manual/messages.html
    Here, you'll concat 5 strings, even if logging is disabled.
    
    Also, there's a space missing in + "at the table".


> Kudu connector to read/write from/to Kudu
> -----------------------------------------
>
>                 Key: BAHIR-99
>                 URL: https://issues.apache.org/jira/browse/BAHIR-99
>             Project: Bahir
>          Issue Type: New Feature
>          Components: Flink Streaming Connectors
>    Affects Versions: Flink-1.0
>            Reporter: Rubén Casado
>            Assignee: Rubén Casado
>             Fix For: Flink-Next
>
>
> Java library to integrate Apache Kudu and Apache Flink. Main goal is to be 
> able to read/write data from/to Kudu using the DataSet and DataStream Flink's 
> APIs.
> Data flows patterns:
> Batch
>  - Kudu -> DataSet<RowSerializable> -> Kudu
>  - Kudu -> DataSet<RowSerializable> -> other source
>  - Other source -> DataSet<RowSerializable> -> other source
> Stream
>  - Other source -> DataStream <RowSerializable> -> Kudu
> Code is available in https://github.com/rubencasado/Flink-Kudu



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to