Dear Team,

 

I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.15.0 & using reference code from
pyflink reference code.

The errors I am getting

Traceback (most recent call last):

  File
"E:\pythonProject16\lib\site-packages\apache_beam\runners\worker\data_plane.
py", line 470, in input_elements

    element = received.get(timeout=1)

  File "C:\Users\Admin\AppData\Local\Programs\Python\Python38\lib\queue.py",
line 178, in get

    raise Empty

_queue.Empty

RuntimeError: Channel closed prematurely.

My code is:

import json

import os

import time

from datetime import datetime

 

from pyflink.common import SimpleStringSchema, JsonRowDeserializationSchema,
Types, JsonRowSerializationSchema

from pyflink.datastream import StreamExecutionEnvironment, WindowFunction,
HashMapStateBackend, CheckpointingMode, \

    FileSystemCheckpointStorage, KeyedProcessFunction, RuntimeContext,
EmbeddedRocksDBStateBackend, RocksDBStateBackend

from pyflink.datastream.connectors import FlinkKafkaConsumer,
FlinkKafkaProducer

from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig,
ListStateDescriptor

from sklearn.preprocessing import LabelEncoder

import pickle

import pandas as pd

from pyflink.common import Row

 

import argparse

from typing import Iterable

 

from pyflink.datastream.connectors import FileSink, OutputFileConfig,
RollingPolicy

 

from pyflink.common import Types, WatermarkStrategy, Time, Encoder

from pyflink.common.watermark_strategy import TimestampAssigner

from pyflink.datastream import StreamExecutionEnvironment,
ProcessWindowFunction

from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow,
TumblingProcessingTimeWindows

 

 

class MyTimestampAssigner(TimestampAssigner):

    def extract_timestamp(self, value, record_timestamp) -> int:

        return int(value[0])

 

 

class CountWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):

    def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):

        # return [(key, result)]

        return [(key, len([e for e in inputs]))]

 

 

class Storage(KeyedProcessFunction):

 

    def __init__(self):

        self.state = None

 

    def open(self, runtime_context: RuntimeContext):

        state_descriptor = ValueStateDescriptor("state", Types.FLOAT())

        state_ttl_config = StateTtlConfig \

            .new_builder(Time.days(7)) \

            .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \

            .disable_cleanup_in_background() \

            .build()

        state_descriptor.enable_time_to_live(state_ttl_config)

        self.state = runtime_context.get_state(state_descriptor)

 

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):

        # retrieve the current count

        current = self.state.value()

        if current is None:

            current = 0

        current = value[1]

        self.state.update(current)

 

        yield current,time.time()

 

 

def write_to_kafka():

    env = StreamExecutionEnvironment.get_execution_environment()

    env.set_parallelism(1)

    env.enable_checkpointing(1000)

    env.get_checkpoint_config().set_min_pause_between_checkpoints(5000)

    env.set_state_backend(EmbeddedRocksDBStateBackend())

 
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.AT_LEAS
T_ONCE)

    #env.get_checkpoint_config().enable_unaligned_checkpoints()

    check = os.path.join(os.path.abspath(os.path.dirname(__file__)),

                         'checkpoint-dir11')

 
env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStora
ge("file:///{}".format(check)))

    kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),

                             'flink-sql-connector-kafka_2.11-1.14.4.jar')

    env.add_jars("file:///{}".format(kafka_jar))

    rocksdb_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),

                             'flink-statebackend-rocksdb_2.11-1.0.0.jar')

    env.add_jars("file:///{}".format(rocksdb_jar))

    # deserialization_schema = SimpleStringSchema()

    deserialization_schema = JsonRowDeserializationSchema.builder() \

        .type_info(type_info=Types.ROW_NAMED(["time_stamp",

                                              "Bill_number", "Store_Code",
"itemdescription", "Item_code",

                                              "Gross_Price", "Discount",
"Net_Price",

                                              "purchaseorReturn",

                                              "Membership_No",
"Billing_Date", "Billing_Time"],

                                             [Types.DOUBLE(),
Types.STRING(), Types.INT(), Types.STRING(),

                                              Types.STRING(), Types.FLOAT(),
Types.FLOAT(), Types.FLOAT(),

                                              Types.STRING(),
Types.STRING(), Types.STRING(),

                                              Types.STRING()])).build()

 

    kafka_consumer = FlinkKafkaConsumer(

        topics='test',

        deserialization_schema=deserialization_schema,

        properties={'bootstrap.servers': '192.168.1.37:9092', 'group.id':
'test_group1'})

 

    ds = env.add_source(kafka_consumer)

    # ds.print()

    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \

        .with_timestamp_assigner(MyTimestampAssigner())

    # ds = ds.map(lambda x: eval(x))

 

    ds1 = ds.assign_timestamps_and_watermarks(watermark_strategy) \

        .key_by(lambda x: '/root', key_type=Types.STRING()) \

        .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) \

        .apply(CountWindowFunction(), Types.TUPLE([Types.STRING(),
Types.INT()]))

    ds1.print()

    ds4 = ds1.key_by(lambda value: value[0]) \

        .process(Storage())

    ds4.print()

    # ds1 = ds.map(mlfunc)

    # ds1.print()

 

    env.execute('write_to_kafka')

 

 

if __name__ == '__main__':

    print("start writing data to kafka")

    write_to_kafka()

 

 

Reply via email to