Thank you for your reply. I tied it with a sample stream but it did not work. I am trying to get the results from my producer here with a very simple query. I want to see results in the console/output.
This is my code: // Docker: docker-compose.yml version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:6.1.1 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:6.1.1 hostname: broker container_name: broker depends_on: - zookeeper ports: - "29092:29092" - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 // Producer import json import sys from kafka import KafkaProducer KAFKA_SERVER = "127.0.0.1:9092" def serializer(dictionary): try: message = json.dumps(dictionary) except Exception as e: sys.stderr.write(str(e) + '\n') message = str(dictionary) return message.encode('utf8') def create_sample_empad_json(raw_id): return {'raw_id':int(raw_id), 'raw_data': str(int(raw_id) + 7)} def do_produce(): producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER, value_serializer =serializer) for raw_id in range(1,10): empad_json = data_helper.create_sample_empad_json(raw_id) producer.send('EMPAD', empad_json) producer.flush() if __name__ == '__main__': do_produce(XRD_PATH) // Flink from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment from pyflink.table import EnvironmentSettings from pyflink.table.table_environment import StreamTableEnvironment def data_processing(): env = StreamExecutionEnvironment.get_execution_environment() env.add_jars("file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar") env.add_jars("file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar") env.add_jars("file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar") settings = EnvironmentSettings.new_instance() \ .in_streaming_mode() \ .build() t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings) t1 = f""" CREATE TEMPORARY TABLE raw_table( raw_id INT, raw_data STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'EMPAD', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'MY_GRP', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """ t_env.execute_sql(t1) table_result = t_env.execute_sql("select raw_id, raw_data from raw_table") with table_result.collect() as results: for result in results: print(result) if __name__ == '__main__': data_processing() getting this error message: pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Failed to execute sql at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'collect'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203) at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) at org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:73) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884) ... 13 more Best, Amir On Sun, Feb 5, 2023 at 8:20 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote: > Hi, thanks for reaching me out. > For your question, you don't need to cosume data in my cosumer class > seperately and then insert them into those tables. The data will > be consumed from what we implemented here. > > Best regards, > Yuxia > > ------------------------------ > *发件人: *"Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com> > *收件人: *luoyu...@alumni.sjtu.edu.cn > *发送时间: *星期日, 2023年 2 月 05日 上午 6:07:02 > *主题: *Re: Need help how to use Table API to join two Kafka streams > > Dear Yuxia, dev@flink.apache.org > Thank you again for your help. I am implementing code in Python. But I am > still have some confusion about my application. > As I mentioned before, I am sending two simple messages (JSON) on two > different topics: > This is my Kafka producer class: > > import json > import sys > > from kafka import KafkaProducer > > def serializer(dictionary): > try: > message = json.dumps(dictionary) > except Exception as e: > sys.stderr.write(str(e) + '\n') > message = str(dictionary) > return message.encode('utf8') > > def create_sample_json(row_id): > return {'row_id':int(row_id), 'my_data': str(int(row_id) + 7)} > > def do_produce(topic_name): > producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER, > value_serializer=serializer) > for row_id in range(1,10): > my_data = data_helper.create_sample_json(row_id) > producer.send(topic_name, my_data) > producer.flush() > > if __name__ == '__main__': > do_produce('topic1') > do_produce('topic2') > > ================================================================================== > > As you helped me, this is my Flink Consumer that I want to cosnume data from > producer and run queries on them: > > from pyflink.datastream.stream_execution_environment import > StreamExecutionEnvironment > from pyflink.table import EnvironmentSettings > from pyflink.table.expressions import col > from pyflink.table.table_environment import StreamTableEnvironment > > from org.varimat.model.com.varimat_constants import EMPAD_TOPIC > > KAFKA_SERVERS = 'localhost:9092' > > def log_processing(): > env = StreamExecutionEnvironment.get_execution_environment() > env.add_jars("file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar") > > env.add_jars("file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar") > > env.add_jars("file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar") > > settings = EnvironmentSettings.new_instance() \ > .in_streaming_mode() \ > .build() > > t_env = StreamTableEnvironment.create(stream_execution_environment=env, > environment_settings=settings) > > t1 = f""" > CREATE TEMPORARY TABLE table1( > row_id INT, > row_data STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic1', > 'properties.bootstrap.servers' = '{KAFKA_SERVERS}', > 'properties.group.id' = 'MY_GRP', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ) > """ > > t2 = f""" > CREATE TEMPORARY TABLE table2( > row_id INT, > row_data STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'table2', > 'properties.bootstrap.servers' = '{KAFKA_SERVERS}', > 'properties.group.id' = 'MY_GRP', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ) > """ > > t_env.execute_sql(t1) > t_env.execute_sql(t2) > > t3 = t_env.sql_query("SELECT row_id, row_data as my_raw_data FROM table2") > > // please tell me what should I do next: > > // Questions: > > // 1) Do I need to cosume data in my cosumer class seperately and then insert > them into those tables or data will be consumed from > > what we implemented here (as I passed the name of the connector, toipc, > bootstartap.servers, etc...)? > > // 2) If so: > > 2.1) how can I make join from those streams in Python? > > 2.2) How can I prevant the previous data as my rocedure will send > thousands messages in each topic. I want to make sure that > > not to make duplicate queries. > > // 3) If not, what should I do? > > > Thank you very much. > > Amir > > > > > > > > On Fri, Feb 3, 2023 at 5:45 AM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote: > >> Hi, Amir. >> May look like using scala code: >> >> val t1 = tableEnv.executeSql("CREATE TEMPORARY TABLE s1 (id int, ssn >> string) WITH ('connector' = 'kafka', ...); >> val t2 = tableEnv.executeSql("CREATE TEMPORARY TABLE s2 (id int, ssn >> string) WITH ('connector' = 'kafka', ...); >> >> // you will need to rename the field to join, otherwise, it'll >> "org.apache.flink.table.api.ValidationException: Ambiguous column name: >> ssn". >> val t3 = tableEnv.sqlQuery("SELECT id, ssn as ssn1 FROM s2") >> val result = t1.join(t3).where($"ssn" === $"ssn1"); >> >> Also, you can refer here for more detail[1]. >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins >> >> Best regards, >> Yuxia >> >> ----- 原始邮件 ----- >> 发件人: "Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com> >> 收件人: "dev" <dev@flink.apache.org> >> 发送时间: 星期五, 2023年 2 月 03日 上午 4:45:08 >> 主题: Need help how to use Table API to join two Kafka streams >> >> Hello, >> >> I have a Kafka producer and a Kafka consumer that produces and consumes >> multiple data respectively. You can think of two data sets here. Both >> datasets have a similar structure but carry different data. >> >> I want to implement a Table API to join two Kafka streams while I >> consume them. For example, data1.ssn==data2.ssn >> >> Constraints: >> I don't want to change my producer or use FlinkKafkaProducer. >> >> Thank you very much. >> >> Best, >> Amir >> > >