[ 
https://issues.apache.org/jira/browse/BAHIR-99?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16097669#comment-16097669
 ] 

ASF GitHub Bot commented on BAHIR-99:
-------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/17#discussion_r128921695
  
    --- Diff: 
flink-connector-kudu/src/main/java/es/accenture/flink/Sink/KuduOutputFormat.java
 ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package es.accenture.flink.Sink;
    +
    +import es.accenture.flink.Utils.Exceptions.KuduClientException;
    +import es.accenture.flink.Utils.Exceptions.KuduTableException;
    +import es.accenture.flink.Utils.RowSerializable;
    +import es.accenture.flink.Utils.Utils;
    +import org.apache.flink.api.common.io.RichOutputFormat;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.kudu.client.*;
    +import org.apache.log4j.Logger;
    +
    +import java.io.IOException;
    +
    +
    +public class KuduOutputFormat extends RichOutputFormat<RowSerializable> {
    +
    +    private String host, tableName;
    +    private Integer tableMode;
    +    private String[] fieldsNames;
    +    private transient Utils utils;
    +
    +    //Kudu variables
    +    private transient KuduTable table;
    +
    +    //Modes
    +    public static final Integer CREATE = 1;
    +    public static final Integer APPEND = 2;
    +    public static final Integer OVERRIDE = 3;
    +
    +
    +    //LOG4J
    +    private final static Logger logger = 
Logger.getLogger(KuduOutputFormat.class);
    +    private static final Object lock = new Object();
    +    /**
    +     * Builder to use when you want to create a new table
    +     *
    +     * @param host        Kudu host
    +     * @param tableName   Kudu table name
    +     * @param fieldsNames List of column names in the table to be created
    +     * @param tableMode   Way to operate with table (CREATE, APPEND, 
OVERRIDE)
    +     */
    +    public KuduOutputFormat(String host, String tableName, String[] 
fieldsNames, Integer tableMode) throws KuduException, KuduTableException, 
KuduClientException {
    +        if (tableMode == null || ((!tableMode.equals(CREATE)) && 
(!tableMode.equals(APPEND)) && (!tableMode.equals(OVERRIDE)))) {
    +            throw new IllegalArgumentException("ERROR: Param \"tableMode\" 
not valid (null or empty)");
    +
    +        } else if (!(tableMode.equals(CREATE) || tableMode.equals(APPEND) 
|| tableMode.equals(OVERRIDE))) {
    +            throw new IllegalArgumentException("ERROR: Param \"tableMode\" 
not valid (must be CREATE, APPEND or OVERRIDE)");
    +
    +        } else if (tableMode.equals(CREATE)) {
    +            if (fieldsNames == null || fieldsNames.length == 0)
    +                throw new IllegalArgumentException("ERROR: Missing param 
\"fieldNames\". Can't create a table without column names");
    +
    +        } else if (host == null || host.isEmpty()) {
    +            throw new IllegalArgumentException("ERROR: Param \"host\" not 
valid (null or empty)");
    +
    +        } else if (tableName == null || tableName.isEmpty()) {
    +            throw new IllegalArgumentException("ERROR: Param \"tableName\" 
not valid (null or empty)");
    +        }
    +
    +        this.host = host;
    +        this.tableName = tableName;
    +        this.fieldsNames = fieldsNames;
    +        this.tableMode = tableMode;
    +
    +    }
    +
    +    /**
    +     * Builder to be used when using an existing table
    +     *
    +     * @param host      Kudu host
    +     * @param tableName Kudu table name to be used
    +     * @param tableMode Way to operate with table (CREATE, APPEND, 
OVERRIDE)
    +     * @throws KuduClientException In case of exception caused by Kudu 
Client
    +     * @throws KuduTableException In case of exception caused by Kudu 
Tablet
    +     * @throws KuduException In case of exception caused by Kudu
    +     */
    +    public KuduOutputFormat(String host, String tableName, Integer 
tableMode) throws KuduException, KuduTableException, KuduClientException {
    +        if (tableMode == null || ((!tableMode.equals(CREATE)) && 
(!tableMode.equals(APPEND)) && (!tableMode.equals(OVERRIDE)))) {
    +            throw new IllegalArgumentException("ERROR: Param \"tableMode\" 
not valid (null or empty)");
    +
    +        } else if (tableMode.equals(CREATE)) {
    +            throw new IllegalArgumentException("ERROR: Param \"tableMode\" 
can't be CREATE if missing \"fieldNames\". Use other builder for this mode");
    +
    +        } else if (!(tableMode.equals(APPEND) || 
tableMode.equals(OVERRIDE))) {
    +            throw new IllegalArgumentException("ERROR: Param \"tableMode\" 
not valid (must be APPEND or OVERRIDE)");
    +
    +        } else if (host == null || host.isEmpty()) {
    +            throw new IllegalArgumentException("ERROR: Param \"host\" not 
valid (null or empty)");
    +
    +        } else if (tableName == null || tableName.isEmpty()) {
    +            throw new IllegalArgumentException("ERROR: Param \"tableName\" 
not valid (null or empty)");
    +        }
    +
    +        this.host = host;
    +        this.tableName = tableName;
    +        this.tableMode = tableMode;
    +
    +    }
    +
    +
    +    @Override
    +    public void configure(Configuration configuration) {
    +
    +    }
    +
    +    @Override
    +    public void open(int i, int i1) throws IOException {
    +
    +        // Establish connection with Kudu
    +        this.utils = new Utils(host);
    +        if(this.utils.getClient().tableExists(tableName)){
    +            logger.info("Mode is CREATE and table already exist. Changed 
mode to APPEND. Warning, parallelism may be less efficient");
    +            tableMode = APPEND;
    +        }
    +
    +        // Case APPEND (or OVERRIDE), with builder without column names, 
because otherwise it throws a NullPointerException
    +        if(tableMode.equals(APPEND) || tableMode.equals(OVERRIDE)) {
    +            this.table = utils.useTable(tableName, tableMode);
    +
    +            if (fieldsNames == null || fieldsNames.length == 0) {
    +                fieldsNames = utils.getNamesOfColumns(table);
    +            } else {
    +                // When column names provided, and table exists, must 
check if column names match
    +                
utils.checkNamesOfColumns(utils.getNamesOfColumns(this.table), fieldsNames);
    +            }
    +
    +        }
    +
    +    }
    +
    +    @Override
    +    public void close() throws IOException {
    +        this.utils.getClient().close();
    +    }
    +
    +    /**
    +     * It's responsible to insert a row into the indicated table by the 
builder (Batch)
    +     *
    +     * @param row   Data of a row to insert
    +     * */
    +    @Override
    +    public void writeRecord(RowSerializable row) throws IOException {
    +
    +        if(tableMode.equals(CREATE)){
    +            if (!utils.getClient().tableExists(tableName)) {
    --- End diff --
    
    How expensive is this `tableExists()` call? I assume it'll be executed on 
each record to write.


> Kudu connector to read/write from/to Kudu
> -----------------------------------------
>
>                 Key: BAHIR-99
>                 URL: https://issues.apache.org/jira/browse/BAHIR-99
>             Project: Bahir
>          Issue Type: New Feature
>          Components: Flink Streaming Connectors
>    Affects Versions: Flink-1.0
>            Reporter: Rubén Casado
>            Assignee: Rubén Casado
>             Fix For: Flink-Next
>
>
> Java library to integrate Apache Kudu and Apache Flink. Main goal is to be 
> able to read/write data from/to Kudu using the DataSet and DataStream Flink's 
> APIs.
> Data flows patterns:
> Batch
>  - Kudu -> DataSet<RowSerializable> -> Kudu
>  - Kudu -> DataSet<RowSerializable> -> other source
>  - Other source -> DataSet<RowSerializable> -> other source
> Stream
>  - Other source -> DataStream <RowSerializable> -> Kudu
> Code is available in https://github.com/rubencasado/Flink-Kudu



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to