Traceback (most recent call last): File "/Users/amir/PycharmProjects/VariMat/org/varimat/model/test/sample2.py", line 59, in <module> log_processing() File "/Users/amir/PycharmProjects/VariMat/org/varimat/model/test/sample2.py", line 34, in log_processing table_result = t_env.execute_sql("select raw_id, raw_data from raw_table") File "/Users/amir/.local/share/virtualenvs/VariMat-dmrWdVEG/lib/python3.9/site-packages/pyflink/table/table_environment.py", line 836, in execute_sql return TableResult(self._j_tenv.executeSql(stmt)) File "/Users/amir/.local/share/virtualenvs/VariMat-dmrWdVEG/lib/python3.9/site-packages/py4j/java_gateway.py", line 1321, in __call__ return_value = get_return_value( File "/Users/amir/.local/share/virtualenvs/VariMat-dmrWdVEG/lib/python3.9/site-packages/pyflink/util/exceptions.py", line 158, in deco raise java_exception pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Failed to execute sql at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'collect'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203) at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) at org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:73) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884) ... 13 more Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: raw_table[1] at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) ... 3 more Caused by: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: raw_table[1] at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ... 3 more Caused by: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: raw_table[1] at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:229) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203) at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152) at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ... 4 more Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2460) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589) at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:488) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) ... 20 more
Process finished with exit code 1 On Mon, Feb 6, 2023 at 11:05 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote: > Hi, could you please share us the root cause? > Seems the error message you posted hadn't contained the root cause. Maybe > you can post the full error message . > > Best regards, > Yuxia > > ------------------------------ > *发件人: *"Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com> > *收件人: *"yuxia" <luoyu...@alumni.sjtu.edu.cn> > *抄送: *"dev" <dev@flink.apache.org> > *发送时间: *星期二, 2023年 2 月 07日 上午 10:39:25 > *主题: *Re: Need help how to use Table API to join two Kafka streams > > Thank you for your reply. I tied it with a sample stream but it did not > work. I am trying to get the results from my producer here with a very > simple query. I want to see results in the console/output. > This is my code: > > // Docker: docker-compose.yml > > version: '2' > services: > zookeeper: > image: confluentinc/cp-zookeeper:6.1.1 > hostname: zookeeper > container_name: zookeeper > ports: > - "2181:2181" > environment: > ZOOKEEPER_CLIENT_PORT: 2181 > ZOOKEEPER_TICK_TIME: 2000 > > broker: > image: confluentinc/cp-kafka:6.1.1 > hostname: broker > container_name: broker > depends_on: > - zookeeper > ports: > - "29092:29092" > - "9092:9092" > - "9101:9101" > environment: > KAFKA_BROKER_ID: 1 > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' > KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: > PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT > KAFKA_ADVERTISED_LISTENERS: > PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 > KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 > KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 > KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 > KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 > > > > // Producer > > import json > import sys > > from kafka import KafkaProducer > > KAFKA_SERVER = "127.0.0.1:9092" > > def serializer(dictionary): > try: > message = json.dumps(dictionary) > except Exception as e: > sys.stderr.write(str(e) + '\n') > message = str(dictionary) > return message.encode('utf8') > > def create_sample_empad_json(raw_id): > return {'raw_id':int(raw_id), 'raw_data': str(int(raw_id) + 7)} > > def do_produce(): > producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER, value_serializer > =serializer) > for raw_id in range(1,10): > empad_json = data_helper.create_sample_empad_json(raw_id) > producer.send('EMPAD', empad_json) > > producer.flush() > > if __name__ == '__main__': > do_produce(XRD_PATH) > > > // Flink > > from pyflink.datastream.stream_execution_environment import > StreamExecutionEnvironment > from pyflink.table import EnvironmentSettings > from pyflink.table.table_environment import StreamTableEnvironment > > def data_processing(): > env = StreamExecutionEnvironment.get_execution_environment() > env.add_jars("file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar") > > env.add_jars("file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar") > > env.add_jars("file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar") > > settings = EnvironmentSettings.new_instance() \ > .in_streaming_mode() \ > .build() > > t_env = StreamTableEnvironment.create(stream_execution_environment=env, > environment_settings=settings) > > t1 = f""" > CREATE TEMPORARY TABLE raw_table( > raw_id INT, > raw_data STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'EMPAD', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'MY_GRP', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ) > """ > > t_env.execute_sql(t1) > > table_result = t_env.execute_sql("select raw_id, raw_data from raw_table") > > with table_result.collect() as results: > for result in results: > print(result) > > if __name__ == '__main__': > data_processing() > > > > getting this error message: > > pyflink.util.exceptions.TableException: > org.apache.flink.table.api.TableException: Failed to execute sql > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:750) > Caused by: org.apache.flink.util.FlinkException: Failed to execute job > 'collect'. > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203) > at > org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) > at > org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:73) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884) > ... 13 more > > > Best, > > Amir > > > On Sun, Feb 5, 2023 at 8:20 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote: > >> Hi, thanks for reaching me out. >> For your question, you don't need to cosume data in my cosumer class >> seperately and then insert them into those tables. The data will >> be consumed from what we implemented here. >> >> Best regards, >> Yuxia >> >> ------------------------------ >> *发件人: *"Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com> >> *收件人: *luoyu...@alumni.sjtu.edu.cn >> *发送时间: *星期日, 2023年 2 月 05日 上午 6:07:02 >> *主题: *Re: Need help how to use Table API to join two Kafka streams >> >> Dear Yuxia, dev@flink.apache.org >> Thank you again for your help. I am implementing code in Python. But I am >> still have some confusion about my application. >> As I mentioned before, I am sending two simple messages (JSON) on two >> different topics: >> This is my Kafka producer class: >> >> import json >> import sys >> >> from kafka import KafkaProducer >> >> def serializer(dictionary): >> try: >> message = json.dumps(dictionary) >> except Exception as e: >> sys.stderr.write(str(e) + '\n') >> message = str(dictionary) >> return message.encode('utf8') >> >> def create_sample_json(row_id): >> return {'row_id':int(row_id), 'my_data': str(int(row_id) + 7)} >> >> def do_produce(topic_name): >> producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER, >> value_serializer=serializer) >> for row_id in range(1,10): >> my_data = data_helper.create_sample_json(row_id) >> producer.send(topic_name, my_data) >> producer.flush() >> >> if __name__ == '__main__': >> do_produce('topic1') >> do_produce('topic2') >> >> ================================================================================== >> >> As you helped me, this is my Flink Consumer that I want to cosnume data from >> producer and run queries on them: >> >> from pyflink.datastream.stream_execution_environment import >> StreamExecutionEnvironment >> from pyflink.table import EnvironmentSettings >> from pyflink.table.expressions import col >> from pyflink.table.table_environment import StreamTableEnvironment >> >> from org.varimat.model.com.varimat_constants import EMPAD_TOPIC >> >> KAFKA_SERVERS = 'localhost:9092' >> >> def log_processing(): >> env = StreamExecutionEnvironment.get_execution_environment() >> env.add_jars("file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar") >> >> env.add_jars("file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar") >> >> env.add_jars("file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar") >> >> settings = EnvironmentSettings.new_instance() \ >> .in_streaming_mode() \ >> .build() >> >> t_env = StreamTableEnvironment.create(stream_execution_environment=env, >> environment_settings=settings) >> >> t1 = f""" >> CREATE TEMPORARY TABLE table1( >> row_id INT, >> row_data STRING >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'topic1', >> 'properties.bootstrap.servers' = '{KAFKA_SERVERS}', >> 'properties.group.id' = 'MY_GRP', >> 'scan.startup.mode' = 'latest-offset', >> 'format' = 'json' >> ) >> """ >> >> t2 = f""" >> CREATE TEMPORARY TABLE table2( >> row_id INT, >> row_data STRING >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'table2', >> 'properties.bootstrap.servers' = '{KAFKA_SERVERS}', >> 'properties.group.id' = 'MY_GRP', >> 'scan.startup.mode' = 'latest-offset', >> 'format' = 'json' >> ) >> """ >> >> t_env.execute_sql(t1) >> t_env.execute_sql(t2) >> >> t3 = t_env.sql_query("SELECT row_id, row_data as my_raw_data FROM >> table2") >> >> // please tell me what should I do next: >> >> // Questions: >> >> // 1) Do I need to cosume data in my cosumer class seperately and then >> insert them into those tables or data will be consumed from >> >> what we implemented here (as I passed the name of the connector, >> toipc, bootstartap.servers, etc...)? >> >> // 2) If so: >> >> 2.1) how can I make join from those streams in Python? >> >> 2.2) How can I prevant the previous data as my rocedure will send >> thousands messages in each topic. I want to make sure that >> >> not to make duplicate queries. >> >> // 3) If not, what should I do? >> >> >> Thank you very much. >> >> Amir >> >> >> >> >> >> >> >> On Fri, Feb 3, 2023 at 5:45 AM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote: >> >>> Hi, Amir. >>> May look like using scala code: >>> >>> val t1 = tableEnv.executeSql("CREATE TEMPORARY TABLE s1 (id int, ssn >>> string) WITH ('connector' = 'kafka', ...); >>> val t2 = tableEnv.executeSql("CREATE TEMPORARY TABLE s2 (id int, ssn >>> string) WITH ('connector' = 'kafka', ...); >>> >>> // you will need to rename the field to join, otherwise, it'll >>> "org.apache.flink.table.api.ValidationException: Ambiguous column name: >>> ssn". >>> val t3 = tableEnv.sqlQuery("SELECT id, ssn as ssn1 FROM s2") >>> val result = t1.join(t3).where($"ssn" === $"ssn1"); >>> >>> Also, you can refer here for more detail[1]. >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins >>> >>> Best regards, >>> Yuxia >>> >>> ----- 原始邮件 ----- >>> 发件人: "Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com> >>> 收件人: "dev" <dev@flink.apache.org> >>> 发送时间: 星期五, 2023年 2 月 03日 上午 4:45:08 >>> 主题: Need help how to use Table API to join two Kafka streams >>> >>> Hello, >>> >>> I have a Kafka producer and a Kafka consumer that produces and consumes >>> multiple data respectively. You can think of two data sets here. Both >>> datasets have a similar structure but carry different data. >>> >>> I want to implement a Table API to join two Kafka streams while I >>> consume them. For example, data1.ssn==data2.ssn >>> >>> Constraints: >>> I don't want to change my producer or use FlinkKafkaProducer. >>> >>> Thank you very much. >>> >>> Best, >>> Amir >>> >> >