soumilshah1995 commented on issue #7591: URL: https://github.com/apache/hudi/issues/7591#issuecomment-1370164850
Here is details again i have tried again this morning #### Please note This time i am on US-WEST-2 previously i was trying on US-EAST-1 #### Kinesis Streams ![image](https://user-images.githubusercontent.com/39345855/210422439-4c008e1e-ff85-421b-8fc5-75b8b635896d.png) ### Python Code to Dump Dummy Data ``` try: import datetime import json import random import boto3 import os import uuid import time from faker import Faker from dotenv import load_dotenv load_dotenv(".env") except Exception as e: pass global faker faker = Faker() def getReferrer(): data = {} now = datetime.now() str_now = now.isoformat() data['uuid'] = str(uuid.uuid4()) data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data = json.dumps(getReferrer()) print(data) global kinesis_client kinesis_client = boto3.client('kinesis', region_name=os.getenv("DEV_AWS_REGION_NAME"), aws_access_key_id=os.getenv("DEV_ACCESS_KEY"), aws_secret_access_key=os.getenv("DEV_SECRET_KEY") ) res = kinesis_client.put_record( StreamName="stock-streams", Data=data, PartitionKey="1") time.sleep(3) ``` #### KDA ![image](https://user-images.githubusercontent.com/39345855/210422526-8e61f16e-c457-4aea-8a2f-c59ec03172f7.png) #### Settings Added JAR Files ![image](https://user-images.githubusercontent.com/39345855/210422789-a6f08f42-2ad9-46c6-a8d9-b05ba150a6ba.png) ``` %flink.ssql(type=update) DROP TABLE if exists stock_table; CREATE TABLE stock_table ( uuid varchar, ticker VARCHAR, price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = 'stock-streams', 'aws.region' = 'us-west-2', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ); ``` ![image](https://user-images.githubusercontent.com/39345855/210428825-c54b6e4b-c262-409d-ae25-ccb134c0b011.png) ``` %flink.ssql(type=update) DROP TABLE if exists stock_table_hudi; CREATE TABLE stock_table_hudi( uuid varchar, ticker VARCHAR, price DOUBLE, event_time TIMESTAMP(3) ) WITH ( 'connector' = 'hudi', 'path' = 's3://soumil-dms-learn', 'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE ); ``` ![image](https://user-images.githubusercontent.com/39345855/210429264-f131e2ab-14ee-47a9-beff-a98c8c598b3f.png) # Real Time Data ![image](https://user-images.githubusercontent.com/39345855/210429752-6baaff85-eaf6-44b2-bb9e-cdf68b34ac06.png) ## Error Messages Same as above ``` ConnectException: Connection refused java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi SELECT uuid, ticker, price, event_time as ts from stock_table'. at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538) at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112) at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132) at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi SELECT uuid, ticker, price, event_time as ts from stock_table'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970) at org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848) at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532) ... 14 more Caused by: java.lang.RuntimeException: Error while waiting for job to be initialized at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160) at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ... 1 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$0(AbstractSessionClusterExecutor.java:83) at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:144) ... 5 more Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted. at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ... 1 more Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.217.181:8082 at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063) ... 19 more Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.217.181:8082 Caused by: java.net.ConnectException: Connection refused at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.base/java.lang.Thread.run(Thread.java:829) ``` ![image](https://user-images.githubusercontent.com/39345855/210430926-2589ee44-8403-4383-a1e6-81a21b14a063.png) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org