awsUser123 opened a new issue, #6475:
URL: https://github.com/apache/hudi/issues/6475

   Hey guys, I am trying to implement reading from kinesis data streams and 
storing it into an s3 bucket using Hudi.
   I was able to add the data into s3 by referring and running the following 
code-  
https://github.com/awsalialem/amazon-kinesis-data-analytics-java-examples/blob/master/S3Sink/src/main/java/com/amazonaws/services/kinesisanalytics/S3StreamingSinkJob.java
   
   I wanted to know how I can further store the data into S3 using Hudi 
connector while reading from kinesis data streams. Is there any modification I 
can do to the code?
   
   I wrote the following code but this fails the Flink job -
   
   ` import lombok.NonNull;
    import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.flink.api.java.tuple.Tuple2;
    import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.api.common.serialization.SimpleStringEncoder;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.configuration.Configuration;
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * Entry point into the NZTIlStream Flink application.
     *
     */
    @Log4j2
    public final class Streaming {
        /**
         * Private constructor.
         */
        private Streaming() {
            throw new UnsupportedOperationException("Creating an instance is 
not allowed!");
        }
    
        private static final String REGION = "us-west-2";
        private static final String INPUTSTREAMNAME = "stream_name";
        private static final String S3SINKPATH = "s3://ka-app-bucketname/data";
   
    
        //creating kinesis data streams as the source
        private static DataStream<String> 
createSourceFromStaticConfig(StreamExecutionEnvironment env) {
            Properties inputProperties = new Properties();
            inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, 
REGION);
            
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"LATEST");
    
            return env.addSource(new FlinkKinesisConsumer<>(INPUTSTREAMNAME, 
new SimpleStringSchema(), inputProperties));
        }
        //creating S3 bucket as the sink
        private static StreamingFileSink<String> createS3SinkFromStaticConfig() 
{
    
            final StreamingFileSink<String> sink = StreamingFileSink
                    .forRowFormat(new Path(S3SINKPATH), new 
SimpleStringEncoder<String>("UTF-8"))
                    .build();
            return sink;
        }
    
   
        /**
         * Main method.
         *
         * @param args the cli args used
         * @throws Exception when the jobs fails to execute
         */
        public static void main(@NonNull final String[] args) throws Exception {
    
            // set up the streaming execution environment
            final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream<String> input = createSourceFromStaticConfig(env);
    
            ObjectMapper jsonParser = new ObjectMapper();
    
            input.map(value -> { // Parse the JSON
                        JsonNode jsonNode = jsonParser.readValue(value, 
JsonNode.class);
                        return new Tuple2<>(jsonNode.get("TICKER").toString(), 
1);
                    }).returns(Types.TUPLE(Types.STRING, Types.INT))
                    .keyBy(0) // Logically partition the stream for each word
                    .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) 
// Flink 1.13
                    .sum(1) // Count the appearances by ticker per partition
                    .map(value -> value.f0 + " count: " + value.f1.toString() + 
"\n")
                    .addSink(createS3SinkFromStaticConfig());
    
    
            env.execute("Flink S3 Streaming Sink Job");
            KinesisHudiSqlExample.createAndDeployJob(env, S3SINKPATH, 
INPUTSTREAMNAME, REGION);
        }
    
        public static class KinesisHudiSqlExample {
    
            public static void createAndDeployJob(StreamExecutionEnvironment 
env, String s3Path,
                                                  String inputKinesisStream, 
String region) {
                StreamTableEnvironment streamTableEnvironment = 
StreamTableEnvironment.create(
                        env, 
EnvironmentSettings.newInstance().useBlinkPlanner().build());
    
                Configuration configuration = 
streamTableEnvironment.getConfig().getConfiguration();
                configuration.setString("execution.checkpointing.interval", "1 
min");
    
                final String createTableStmt = String.format("CREATE TABLE IF 
NOT EXISTS Stock_table (" +
                        "  `event_time` STRING, " +
                        "  `ticker` STRING, " +
                        "  `price` DECIMAL " +
                        "   )  PARTITIONED BY (`ticker`)" +
                        "   WITH (" +
                        "  'connector' = 'kinesis'," +
                        "  'stream' = '%s'," +
                        "  'aws.region' = '%s'," +
                        "  'scan.stream.initpos' = 'LATEST'," +
                        "  'format' = 'csv'," +
                        "   )", inputKinesisStream, region);
    
                final String s3Sink = String.format("CREATE TABLE Hudi_table (" 
+
                        "      `event_time` STRING," +
                        "      `ticker` STRING, " +
                        "      `price` DECIMAL " +
                        "    ) PARTITIONED BY (`ticker`)" +
                        "    WITH (" +
                        "    'connector' = 'hudi'," +
                        "    'path' = '%s'," +
                        "    'table.type' = 'MERGE_ON_READ' --  MERGE_ON_READ 
table or, by default is COPY_ON_WRITE" +
                        "    )", s3Path);
    
                streamTableEnvironment.executeSql(createTableStmt);
                streamTableEnvironment.executeSql(s3Sink);
    
                final String selectSQL = "select * from Stock_table";
                streamTableEnvironment.executeSql(selectSQL);
    
                final String insertSQL = "insert into Hudi_table ticker, price, 
event_time from Stock_table ";
                streamTableEnvironment.executeSql(insertSQL);
    
            }
        }
    
            /**
         * Check if local environment setup flag is set.
         *
         * @param args the cli args
         * @return true if flag is set
         */
        private static boolean isLocalFlagSet(@NonNull final String[] args) {
            return Arrays.stream(args)
                    .anyMatch(s -> 
s.equalsIgnoreCase(Constants.LOCAL_ENVIRONMENT_FLAG));
        }
    
    }`
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to