awsUser123 opened a new issue, #6475: URL: https://github.com/apache/hudi/issues/6475
Hey guys, I am trying to implement reading from kinesis data streams and storing it into an s3 bucket using Hudi. I was able to add the data into s3 by referring and running the following code- https://github.com/awsalialem/amazon-kinesis-data-analytics-java-examples/blob/master/S3Sink/src/main/java/com/amazonaws/services/kinesisanalytics/S3StreamingSinkJob.java I wanted to know how I can further store the data into S3 using Hudi connector while reading from kinesis data streams. Is there any modification I can do to the code? I wrote the following code but this fails the Flink job - ` import lombok.NonNull; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.configuration.Configuration; import java.util.Arrays; import java.util.Properties; /** * Entry point into the NZTIlStream Flink application. * */ @Log4j2 public final class Streaming { /** * Private constructor. */ private Streaming() { throw new UnsupportedOperationException("Creating an instance is not allowed!"); } private static final String REGION = "us-west-2"; private static final String INPUTSTREAMNAME = "stream_name"; private static final String S3SINKPATH = "s3://ka-app-bucketname/data"; //creating kinesis data streams as the source private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, REGION); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(INPUTSTREAMNAME, new SimpleStringSchema(), inputProperties)); } //creating S3 bucket as the sink private static StreamingFileSink<String> createS3SinkFromStaticConfig() { final StreamingFileSink<String> sink = StreamingFileSink .forRowFormat(new Path(S3SINKPATH), new SimpleStringEncoder<String>("UTF-8")) .build(); return sink; } /** * Main method. * * @param args the cli args used * @throws Exception when the jobs fails to execute */ public static void main(@NonNull final String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> input = createSourceFromStaticConfig(env); ObjectMapper jsonParser = new ObjectMapper(); input.map(value -> { // Parse the JSON JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class); return new Tuple2<>(jsonNode.get("TICKER").toString(), 1); }).returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(0) // Logically partition the stream for each word .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) // Flink 1.13 .sum(1) // Count the appearances by ticker per partition .map(value -> value.f0 + " count: " + value.f1.toString() + "\n") .addSink(createS3SinkFromStaticConfig()); env.execute("Flink S3 Streaming Sink Job"); KinesisHudiSqlExample.createAndDeployJob(env, S3SINKPATH, INPUTSTREAMNAME, REGION); } public static class KinesisHudiSqlExample { public static void createAndDeployJob(StreamExecutionEnvironment env, String s3Path, String inputKinesisStream, String region) { StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create( env, EnvironmentSettings.newInstance().useBlinkPlanner().build()); Configuration configuration = streamTableEnvironment.getConfig().getConfiguration(); configuration.setString("execution.checkpointing.interval", "1 min"); final String createTableStmt = String.format("CREATE TABLE IF NOT EXISTS Stock_table (" + " `event_time` STRING, " + " `ticker` STRING, " + " `price` DECIMAL " + " ) PARTITIONED BY (`ticker`)" + " WITH (" + " 'connector' = 'kinesis'," + " 'stream' = '%s'," + " 'aws.region' = '%s'," + " 'scan.stream.initpos' = 'LATEST'," + " 'format' = 'csv'," + " )", inputKinesisStream, region); final String s3Sink = String.format("CREATE TABLE Hudi_table (" + " `event_time` STRING," + " `ticker` STRING, " + " `price` DECIMAL " + " ) PARTITIONED BY (`ticker`)" + " WITH (" + " 'connector' = 'hudi'," + " 'path' = '%s'," + " 'table.type' = 'MERGE_ON_READ' -- MERGE_ON_READ table or, by default is COPY_ON_WRITE" + " )", s3Path); streamTableEnvironment.executeSql(createTableStmt); streamTableEnvironment.executeSql(s3Sink); final String selectSQL = "select * from Stock_table"; streamTableEnvironment.executeSql(selectSQL); final String insertSQL = "insert into Hudi_table ticker, price, event_time from Stock_table "; streamTableEnvironment.executeSql(insertSQL); } } /** * Check if local environment setup flag is set. * * @param args the cli args * @return true if flag is set */ private static boolean isLocalFlagSet(@NonNull final String[] args) { return Arrays.stream(args) .anyMatch(s -> s.equalsIgnoreCase(Constants.LOCAL_ENVIRONMENT_FLAG)); } }` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org