soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1370164850

   Here is details again i have tried again this morning 
   
   #### Please note This time i am on US-WEST-2 previously i was trying on 
US-EAST-1
   
   #### Kinesis Streams 
   
![image](https://user-images.githubusercontent.com/39345855/210422439-4c008e1e-ff85-421b-8fc5-75b8b635896d.png)
   
   ### Python Code to Dump Dummy Data 
   ```
   try:
       import datetime
       import json
       import random
       import boto3
       import os
       import uuid
       import time
       from faker import Faker
       
       from dotenv import load_dotenv
       load_dotenv(".env")
   except Exception as e:
       pass
   
   global faker
   faker = Faker()
   
   
   def getReferrer():
       data = {}
       now = datetime.now()
       str_now = now.isoformat()
       data['uuid'] = str(uuid.uuid4())
       data['event_time'] = str_now
   
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data = json.dumps(getReferrer())
       print(data)
       global kinesis_client
   
       kinesis_client = boto3.client('kinesis',
                                     
region_name=os.getenv("DEV_AWS_REGION_NAME"),
                                     
aws_access_key_id=os.getenv("DEV_ACCESS_KEY"),
                                     
aws_secret_access_key=os.getenv("DEV_SECRET_KEY")
                                     )
   
       res = kinesis_client.put_record(
           StreamName="stock-streams",
           Data=data,
           PartitionKey="1")
       time.sleep(3)
   
   
   ```
   #### KDA
   
   
![image](https://user-images.githubusercontent.com/39345855/210422526-8e61f16e-c457-4aea-8a2f-c59ec03172f7.png)
   
   #### Settings Added JAR Files 
   
![image](https://user-images.githubusercontent.com/39345855/210422789-a6f08f42-2ad9-46c6-a8d9-b05ba150a6ba.png)
   
   ```
   %flink.ssql(type=update)
   
   DROP TABLE if exists stock_table;
   
   CREATE TABLE stock_table (
       uuid varchar,
       ticker VARCHAR,
       price DOUBLE,
       event_time TIMESTAMP(3),
       WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
   )
   PARTITIONED BY (ticker)
   WITH (
       'connector' = 'kinesis',
       'stream' = 'stock-streams',
       'aws.region' = 'us-west-2',
       'scan.stream.initpos' = 'LATEST',
       'format' = 'json',
       'json.timestamp-format.standard' = 'ISO-8601'
   );
   ```
   
![image](https://user-images.githubusercontent.com/39345855/210428825-c54b6e4b-c262-409d-ae25-ccb134c0b011.png)
   
   ```
   %flink.ssql(type=update)
   
   DROP TABLE if exists stock_table_hudi;
   
   CREATE TABLE stock_table_hudi(
       uuid varchar,
       ticker VARCHAR,
       price DOUBLE,
       event_time TIMESTAMP(3)
   )
   WITH (
     'connector' = 'hudi',
     'path' = 's3://soumil-dms-learn',
     'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by 
default is COPY_ON_WRITE
   );
   ```
   
![image](https://user-images.githubusercontent.com/39345855/210429264-f131e2ab-14ee-47a9-beff-a98c8c598b3f.png)
   
   # Real Time Data 
   
![image](https://user-images.githubusercontent.com/39345855/210429752-6baaff85-eaf6-44b2-bb9e-cdf68b34ac06.png)
   
   
   ## Error Messages Same as above
   
   ```
   ConnectException: Connection refused
   java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute 
job 'INSERT INTO stock_table_hudi 
   SELECT  uuid, ticker, price, event_time as ts from stock_table'.
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
        at 
org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
        at 
org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
        at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
        at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
        at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
        at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
        at 
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
        at 
org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
'INSERT INTO stock_table_hudi 
   SELECT  uuid, ticker, price, event_time as ts from stock_table'.
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
        at 
org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
        at 
org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461)
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
        ... 14 more
   Caused by: java.lang.RuntimeException: Error while waiting for job to be 
initialized
        at 
org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160)
        at 
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82)
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
        at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
        at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
        ... 1 more
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Number of retries has been exhausted.
        at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
        at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
        at 
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$0(AbstractSessionClusterExecutor.java:83)
        at 
org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:144)
        ... 5 more
   Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Number of retries has been exhausted.
        at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
        at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
        at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        ... 1 more
   Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
 Connection refused: zeppelin-flink/172.20.217.181:8082
        at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
        at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
        at 
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
        ... 19 more
   Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
 Connection refused: zeppelin-flink/172.20.217.181:8082
   Caused by: java.net.ConnectException: Connection refused
        at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at 
java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   
![image](https://user-images.githubusercontent.com/39345855/210430926-2589ee44-8403-4383-a1e6-81a21b14a063.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to