Hi, I guess that the traceback log you provided might not be the root cause of the failure, could you please provide the complete log of the Taskmanager?
Best, Shuiqiang harshit.varsh...@iktara.ai <harshit.varsh...@iktara.ai> 于2022年6月2日周四 22:04写道: > Dear Team, > > > > I am new to pyflink and request for your support in issue I am facing with > Pyflink. I am using Pyflink version 1.15.0 & using reference code from > pyflink reference code. > > The errors I am getting > > Traceback (most recent call last): > > File > "E:\pythonProject16\lib\site-packages\apache_beam\runners\worker\data_plane.py", > line 470, in input_elements > > element = received.get(timeout=1) > > File > "C:\Users\Admin\AppData\Local\Programs\Python\Python38\lib\queue.py", line > 178, in get > > raise Empty > > _queue.Empty > > RuntimeError: Channel closed prematurely. > > My code is: > > import json > > import os > > import time > > from datetime import datetime > > > > from pyflink.common import SimpleStringSchema, > JsonRowDeserializationSchema, Types, JsonRowSerializationSchema > > from pyflink.datastream import StreamExecutionEnvironment, WindowFunction, > HashMapStateBackend, CheckpointingMode, \ > > FileSystemCheckpointStorage, KeyedProcessFunction, RuntimeContext, > EmbeddedRocksDBStateBackend, RocksDBStateBackend > > from pyflink.datastream.connectors import FlinkKafkaConsumer, > FlinkKafkaProducer > > from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig, > ListStateDescriptor > > from sklearn.preprocessing import LabelEncoder > > import pickle > > import pandas as pd > > from pyflink.common import Row > > > > import argparse > > from typing import Iterable > > > > from pyflink.datastream.connectors import FileSink, OutputFileConfig, > RollingPolicy > > > > from pyflink.common import Types, WatermarkStrategy, Time, Encoder > > from pyflink.common.watermark_strategy import TimestampAssigner > > from pyflink.datastream import StreamExecutionEnvironment, > ProcessWindowFunction > > from pyflink.datastream.window import TumblingEventTimeWindows, > TimeWindow, TumblingProcessingTimeWindows > > > > > > class MyTimestampAssigner(TimestampAssigner): > > def extract_timestamp(self, value, record_timestamp) -> int: > > return int(value[0]) > > > > > > class CountWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]): > > def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]): > > # return [(key, result)] > > return [(key, len([e for e in inputs]))] > > > > > > class Storage(KeyedProcessFunction): > > > > def __init__(self): > > self.state = None > > > > def open(self, runtime_context: RuntimeContext): > > state_descriptor = ValueStateDescriptor("state", Types.FLOAT()) > > state_ttl_config = StateTtlConfig \ > > .new_builder(Time.days(7)) \ > > .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \ > > .disable_cleanup_in_background() \ > > .build() > > state_descriptor.enable_time_to_live(state_ttl_config) > > self.state = runtime_context.get_state(state_descriptor) > > > > def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): > > # retrieve the current count > > current = self.state.value() > > if current is None: > > current = 0 > > current = value[1] > > self.state.update(current) > > > > yield current,time.time() > > > > > > def write_to_kafka(): > > env = StreamExecutionEnvironment.get_execution_environment() > > env.set_parallelism(1) > > env.enable_checkpointing(1000) > > env.get_checkpoint_config().set_min_pause_between_checkpoints(5000) > > env.set_state_backend(EmbeddedRocksDBStateBackend()) > > > env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.AT_LEAST_ONCE) > > #env.get_checkpoint_config().enable_unaligned_checkpoints() > > check = os.path.join(os.path.abspath(os.path.dirname(__file__)), > > 'checkpoint-dir11') > > > env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStorage("file:///{}".format(check))) > > kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), > > 'flink-sql-connector-kafka_2.11-1.14.4.jar') > > env.add_jars("file:///{}".format(kafka_jar)) > > rocksdb_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), > > 'flink-statebackend-rocksdb_2.11-1.0.0.jar') > > env.add_jars("file:///{}".format(rocksdb_jar)) > > # deserialization_schema = SimpleStringSchema() > > deserialization_schema = JsonRowDeserializationSchema.builder() \ > > .type_info(type_info=Types.ROW_NAMED(["time_stamp", > > "Bill_number", "Store_Code", > "itemdescription", "Item_code", > > "Gross_Price", "Discount", > "Net_Price", > > "purchaseorReturn", > > "Membership_No", > "Billing_Date", "Billing_Time"], > > [Types.DOUBLE(), > Types.STRING(), Types.INT(), Types.STRING(), > > Types.STRING(), > Types.FLOAT(), Types.FLOAT(), Types.FLOAT(), > > Types.STRING(), > Types.STRING(), Types.STRING(), > > Types.STRING()])).build() > > > > kafka_consumer = FlinkKafkaConsumer( > > topics='test', > > deserialization_schema=deserialization_schema, > > properties={'bootstrap.servers': '192.168.1.37:9092', 'group.id': > 'test_group1'}) > > > > ds = env.add_source(kafka_consumer) > > # ds.print() > > watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ > > .with_timestamp_assigner(MyTimestampAssigner()) > > # ds = ds.map(lambda x: eval(x)) > > > > ds1 = ds.assign_timestamps_and_watermarks(watermark_strategy) \ > > .key_by(lambda x: '/root', key_type=Types.STRING()) \ > > .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) \ > > .apply(CountWindowFunction(), Types.TUPLE([Types.STRING(), > Types.INT()])) > > ds1.print() > > ds4 = ds1.key_by(lambda value: value[0]) \ > > .process(Storage()) > > ds4.print() > > # ds1 = ds.map(mlfunc) > > # ds1.print() > > > > env.execute('write_to_kafka') > > > > > > if __name__ == '__main__': > > print("start writing data to kafka") > > write_to_kafka() > > > > >