Thank you for your reply. I tied it with a sample stream but it did not
work. I am trying to get the results from my producer here with a very
simple query. I want to see results in the console/output.

This is my code:

// Docker: docker-compose.yml

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.1.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.1.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS:
PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0



// Producer

import json
import sys

from kafka import KafkaProducer

KAFKA_SERVER = "127.0.0.1:9092"

def serializer(dictionary):
try:
message = json.dumps(dictionary)
except Exception as e:
sys.stderr.write(str(e) + '\n')
message = str(dictionary)
return message.encode('utf8')

def create_sample_empad_json(raw_id):
    return {'raw_id':int(raw_id), 'raw_data': str(int(raw_id) + 7)}

def do_produce():
producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER, value_serializer
=serializer)
for raw_id in range(1,10):
empad_json = data_helper.create_sample_empad_json(raw_id)
producer.send('EMPAD', empad_json)

        producer.flush()

if __name__ == '__main__':
    do_produce(XRD_PATH)


// Flink

from pyflink.datastream.stream_execution_environment import
StreamExecutionEnvironment
from pyflink.table import  EnvironmentSettings
from pyflink.table.table_environment import StreamTableEnvironment

def data_processing():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar")
    
env.add_jars("file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar")
    
env.add_jars("file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar")

    settings = EnvironmentSettings.new_instance() \
        .in_streaming_mode() \
        .build()

    t_env = StreamTableEnvironment.create(stream_execution_environment=env,
environment_settings=settings)

    t1 = f"""
            CREATE TEMPORARY TABLE raw_table(
                raw_id INT,
                raw_data STRING
            ) WITH (
              'connector' = 'kafka',
              'topic' = 'EMPAD',
              'properties.bootstrap.servers' = 'localhost:9092',
              'properties.group.id' = 'MY_GRP',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'json'
            )
            """

    t_env.execute_sql(t1)

    table_result = t_env.execute_sql("select raw_id, raw_data from raw_table")

    with table_result.collect() as results:
        for result in results:
            print(result)

if __name__ == '__main__':
    data_processing()



getting this error message:

pyflink.util.exceptions.TableException:
org.apache.flink.table.api.TableException: Failed to execute sql
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job
'collect'.
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
        at 
org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
        at 
org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:73)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884)
        ... 13 more


Best,

Amir


On Sun, Feb 5, 2023 at 8:20 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:

> Hi, thanks for reaching me out.
> For your question, you don't need to cosume data in my cosumer class
> seperately and then insert them into those tables. The data will
> be consumed from what we implemented here.
>
> Best regards,
> Yuxia
>
> ------------------------------
> *发件人: *"Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com>
> *收件人: *luoyu...@alumni.sjtu.edu.cn
> *发送时间: *星期日, 2023年 2 月 05日 上午 6:07:02
> *主题: *Re: Need help how to use Table API to join two Kafka streams
>
> Dear Yuxia, dev@flink.apache.org
> Thank you again for your help. I am implementing code in Python. But I am
> still have some confusion about my application.
> As I mentioned before, I am sending two simple messages (JSON) on two
> different topics:
> This is my Kafka producer class:
>
> import json
> import sys
>
> from kafka import KafkaProducer
>
> def serializer(dictionary):
>     try:
>         message = json.dumps(dictionary)
>     except Exception as e:
>         sys.stderr.write(str(e) + '\n')
>         message = str(dictionary)
>     return message.encode('utf8')
>
> def create_sample_json(row_id):
>     return {'row_id':int(row_id), 'my_data': str(int(row_id) + 7)}
>
> def do_produce(topic_name):
>     producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER, 
> value_serializer=serializer)
>     for row_id in range(1,10):
>         my_data = data_helper.create_sample_json(row_id)
>         producer.send(topic_name, my_data)
>         producer.flush()
>
> if __name__ == '__main__':
>     do_produce('topic1')
>     do_produce('topic2')
>
> ==================================================================================
>
> As you helped me, this is my Flink Consumer that I want to cosnume data from 
> producer and run queries on them:
>
> from pyflink.datastream.stream_execution_environment import 
> StreamExecutionEnvironment
> from pyflink.table import  EnvironmentSettings
> from pyflink.table.expressions import col
> from pyflink.table.table_environment import StreamTableEnvironment
>
> from org.varimat.model.com.varimat_constants import EMPAD_TOPIC
>
> KAFKA_SERVERS = 'localhost:9092'
>
> def log_processing():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.add_jars("file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar")
>     
> env.add_jars("file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar")
>     
> env.add_jars("file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar")
>
>     settings = EnvironmentSettings.new_instance() \
>         .in_streaming_mode() \
>         .build()
>
>     t_env = StreamTableEnvironment.create(stream_execution_environment=env, 
> environment_settings=settings)
>
>     t1 = f"""
>             CREATE TEMPORARY TABLE table1(
>                 row_id INT,
>                 row_data STRING
>             ) WITH (
>               'connector' = 'kafka',
>               'topic' = 'topic1',
>               'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
>               'properties.group.id' = 'MY_GRP',
>               'scan.startup.mode' = 'latest-offset',
>               'format' = 'json'
>             )
>             """
>
>     t2 = f"""
>             CREATE TEMPORARY TABLE table2(
>                 row_id INT,
>                 row_data STRING
>             ) WITH (
>               'connector' = 'kafka',
>               'topic' = 'table2',
>               'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
>               'properties.group.id' = 'MY_GRP',
>               'scan.startup.mode' = 'latest-offset',
>               'format' = 'json'
>             )
>             """
>
>     t_env.execute_sql(t1)
>     t_env.execute_sql(t2)
>
>     t3 = t_env.sql_query("SELECT row_id, row_data as my_raw_data FROM table2")
>
> // please tell me what should I do next:
>
> // Questions:
>
> // 1) Do I need to cosume data in my cosumer class seperately and then insert 
> them into those tables or data will be consumed from
>
>       what we implemented here (as I passed the name of the connector, toipc, 
> bootstartap.servers, etc...)?
>
> // 2) If so:
>
>        2.1) how can I make join from those streams in Python?
>
>        2.2) How can I prevant the previous data as my rocedure will send 
> thousands messages in each topic. I want to make sure that
>
>             not to make duplicate queries.
>
> // 3) If not, what should I do?
>
>
> Thank you very much.
>
> Amir
>
>
>
>
>
>
>
> On Fri, Feb 3, 2023 at 5:45 AM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:
>
>> Hi, Amir.
>> May look like using scala code:
>>
>> val t1 = tableEnv.executeSql("CREATE TEMPORARY TABLE s1 (id int, ssn
>> string) WITH ('connector' = 'kafka', ...);
>> val t2 = tableEnv.executeSql("CREATE TEMPORARY TABLE s2 (id int, ssn
>> string) WITH ('connector' = 'kafka', ...);
>>
>> // you will need to rename the field to join, otherwise, it'll
>> "org.apache.flink.table.api.ValidationException: Ambiguous column name:
>> ssn".
>> val t3 = tableEnv.sqlQuery("SELECT id, ssn as ssn1 FROM s2")
>> val result = t1.join(t3).where($"ssn" === $"ssn1");
>>
>> Also, you can refer here for more detail[1].
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins
>>
>> Best regards,
>> Yuxia
>>
>> ----- 原始邮件 -----
>> 发件人: "Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com>
>> 收件人: "dev" <dev@flink.apache.org>
>> 发送时间: 星期五, 2023年 2 月 03日 上午 4:45:08
>> 主题: Need help how to use Table API to join two Kafka streams
>>
>> Hello,
>>
>> I have a Kafka producer and a Kafka consumer that produces and consumes
>> multiple data respectively. You can think of two data sets here. Both
>> datasets have a similar structure but carry different data.
>>
>> I want to implement a Table API to join two Kafka streams while I
>> consume them. For example, data1.ssn==data2.ssn
>>
>> Constraints:
>> I don't want to change my producer or use FlinkKafkaProducer.
>>
>> Thank you very much.
>>
>> Best,
>> Amir
>>
>
>

Reply via email to