Thank you for your reply. I tied it with a sample stream but it did not
work. I am trying to get the results from my producer here with a very
simple query. I want to see results in the console/output.
This is my code:
// Docker: docker-compose.yml
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.1.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:6.1.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS:
PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
// Producer
import json
import sys
from kafka import KafkaProducer
KAFKA_SERVER = "127.0.0.1:9092"
def serializer(dictionary):
try:
message = json.dumps(dictionary)
except Exception as e:
sys.stderr.write(str(e) + '\n')
message = str(dictionary)
return message.encode('utf8')
def create_sample_empad_json(raw_id):
return {'raw_id':int(raw_id), 'raw_data': str(int(raw_id) + 7)}
def do_produce():
producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER, value_serializer
=serializer)
for raw_id in range(1,10):
empad_json = data_helper.create_sample_empad_json(raw_id)
producer.send('EMPAD', empad_json)
producer.flush()
if __name__ == '__main__':
do_produce(XRD_PATH)
// Flink
from pyflink.datastream.stream_execution_environment import
StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings
from pyflink.table.table_environment import StreamTableEnvironment
def data_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar")
env.add_jars("file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar")
env.add_jars("file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar")
settings = EnvironmentSettings.new_instance() \
.in_streaming_mode() \
.build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env,
environment_settings=settings)
t1 = f"""
CREATE TEMPORARY TABLE raw_table(
raw_id INT,
raw_data STRING
) WITH (
'connector' = 'kafka',
'topic' = 'EMPAD',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'MY_GRP',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
t_env.execute_sql(t1)
table_result = t_env.execute_sql("select raw_id, raw_data from raw_table")
with table_result.collect() as results:
for result in results:
print(result)
if __name__ == '__main__':
data_processing()
getting this error message:
pyflink.util.exceptions.TableException:
org.apache.flink.table.api.TableException: Failed to execute sql
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job
'collect'.
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
at
org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
at
org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:73)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884)
... 13 more
Best,
Amir
On Sun, Feb 5, 2023 at 8:20 PM yuxia <[email protected]> wrote:
> Hi, thanks for reaching me out.
> For your question, you don't need to cosume data in my cosumer class
> seperately and then insert them into those tables. The data will
> be consumed from what we implemented here.
>
> Best regards,
> Yuxia
>
> ------------------------------
> *发件人: *"Amir Hossein Sharifzadeh" <[email protected]>
> *收件人: *[email protected]
> *发送时间: *星期日, 2023年 2 月 05日 上午 6:07:02
> *主题: *Re: Need help how to use Table API to join two Kafka streams
>
> Dear Yuxia, [email protected]
> Thank you again for your help. I am implementing code in Python. But I am
> still have some confusion about my application.
> As I mentioned before, I am sending two simple messages (JSON) on two
> different topics:
> This is my Kafka producer class:
>
> import json
> import sys
>
> from kafka import KafkaProducer
>
> def serializer(dictionary):
> try:
> message = json.dumps(dictionary)
> except Exception as e:
> sys.stderr.write(str(e) + '\n')
> message = str(dictionary)
> return message.encode('utf8')
>
> def create_sample_json(row_id):
> return {'row_id':int(row_id), 'my_data': str(int(row_id) + 7)}
>
> def do_produce(topic_name):
> producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER,
> value_serializer=serializer)
> for row_id in range(1,10):
> my_data = data_helper.create_sample_json(row_id)
> producer.send(topic_name, my_data)
> producer.flush()
>
> if __name__ == '__main__':
> do_produce('topic1')
> do_produce('topic2')
>
> ==================================================================================
>
> As you helped me, this is my Flink Consumer that I want to cosnume data from
> producer and run queries on them:
>
> from pyflink.datastream.stream_execution_environment import
> StreamExecutionEnvironment
> from pyflink.table import EnvironmentSettings
> from pyflink.table.expressions import col
> from pyflink.table.table_environment import StreamTableEnvironment
>
> from org.varimat.model.com.varimat_constants import EMPAD_TOPIC
>
> KAFKA_SERVERS = 'localhost:9092'
>
> def log_processing():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.add_jars("file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar")
>
> env.add_jars("file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar")
>
> env.add_jars("file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar")
>
> settings = EnvironmentSettings.new_instance() \
> .in_streaming_mode() \
> .build()
>
> t_env = StreamTableEnvironment.create(stream_execution_environment=env,
> environment_settings=settings)
>
> t1 = f"""
> CREATE TEMPORARY TABLE table1(
> row_id INT,
> row_data STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'topic1',
> 'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
> 'properties.group.id' = 'MY_GRP',
> 'scan.startup.mode' = 'latest-offset',
> 'format' = 'json'
> )
> """
>
> t2 = f"""
> CREATE TEMPORARY TABLE table2(
> row_id INT,
> row_data STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'table2',
> 'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
> 'properties.group.id' = 'MY_GRP',
> 'scan.startup.mode' = 'latest-offset',
> 'format' = 'json'
> )
> """
>
> t_env.execute_sql(t1)
> t_env.execute_sql(t2)
>
> t3 = t_env.sql_query("SELECT row_id, row_data as my_raw_data FROM table2")
>
> // please tell me what should I do next:
>
> // Questions:
>
> // 1) Do I need to cosume data in my cosumer class seperately and then insert
> them into those tables or data will be consumed from
>
> what we implemented here (as I passed the name of the connector, toipc,
> bootstartap.servers, etc...)?
>
> // 2) If so:
>
> 2.1) how can I make join from those streams in Python?
>
> 2.2) How can I prevant the previous data as my rocedure will send
> thousands messages in each topic. I want to make sure that
>
> not to make duplicate queries.
>
> // 3) If not, what should I do?
>
>
> Thank you very much.
>
> Amir
>
>
>
>
>
>
>
> On Fri, Feb 3, 2023 at 5:45 AM yuxia <[email protected]> wrote:
>
>> Hi, Amir.
>> May look like using scala code:
>>
>> val t1 = tableEnv.executeSql("CREATE TEMPORARY TABLE s1 (id int, ssn
>> string) WITH ('connector' = 'kafka', ...);
>> val t2 = tableEnv.executeSql("CREATE TEMPORARY TABLE s2 (id int, ssn
>> string) WITH ('connector' = 'kafka', ...);
>>
>> // you will need to rename the field to join, otherwise, it'll
>> "org.apache.flink.table.api.ValidationException: Ambiguous column name:
>> ssn".
>> val t3 = tableEnv.sqlQuery("SELECT id, ssn as ssn1 FROM s2")
>> val result = t1.join(t3).where($"ssn" === $"ssn1");
>>
>> Also, you can refer here for more detail[1].
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins
>>
>> Best regards,
>> Yuxia
>>
>> ----- 原始邮件 -----
>> 发件人: "Amir Hossein Sharifzadeh" <[email protected]>
>> 收件人: "dev" <[email protected]>
>> 发送时间: 星期五, 2023年 2 月 03日 上午 4:45:08
>> 主题: Need help how to use Table API to join two Kafka streams
>>
>> Hello,
>>
>> I have a Kafka producer and a Kafka consumer that produces and consumes
>> multiple data respectively. You can think of two data sets here. Both
>> datasets have a similar structure but carry different data.
>>
>> I want to implement a Table API to join two Kafka streams while I
>> consume them. For example, data1.ssn==data2.ssn
>>
>> Constraints:
>> I don't want to change my producer or use FlinkKafkaProducer.
>>
>> Thank you very much.
>>
>> Best,
>> Amir
>>
>
>