None of the attachments are logs of the TaskManger. The TaskManger log
should be located in the directory
`E:\pythonProject16\lib\site-packages\pyflink\log`.

On Fri, Jun 3, 2022 at 8:41 PM harshit.varsh...@iktara.ai <
harshit.varsh...@iktara.ai> wrote:

>
>
>
>
> *From:* Shuiqiang Chen [mailto:acqua....@gmail.com]
> *Sent:* Friday, June 3, 2022 2:11 PM
> *To:* harshit.varsh...@iktara.ai
> *Cc:* user
> *Subject:* Re: Issue Facing While Using EmbeddedRocksDbCheckpointing
> FlinkVersion(1.15.0)
>
>
>
> Hi,
>
>
>
> I guess that the traceback log you provided might not be the root cause of
> the failure, could you please provide the complete log of the Taskmanager?
>
>
>
> Best,
>
> Shuiqiang
>
>
>
> harshit.varsh...@iktara.ai <harshit.varsh...@iktara.ai> 于2022年6月2日周四 22:04
> 写道:
>
> Dear Team,
>
>
>
> I am new to pyflink and request for your support in issue I am facing with
> Pyflink. I am using Pyflink version 1.15.0 & using reference code from
> pyflink reference code.
>
> The errors I am getting
>
> Traceback (most recent call last):
>
>   File
> "E:\pythonProject16\lib\site-packages\apache_beam\runners\worker\data_plane.py",
> line 470, in input_elements
>
>     element = received.get(timeout=1)
>
>   File
> "C:\Users\Admin\AppData\Local\Programs\Python\Python38\lib\queue.py", line
> 178, in get
>
>     raise Empty
>
> _queue.Empty
>
> RuntimeError: Channel closed prematurely.
>
> My code is:
>
> import json
>
> import os
>
> import time
>
> from datetime import datetime
>
>
>
> from pyflink.common import SimpleStringSchema,
> JsonRowDeserializationSchema, Types, JsonRowSerializationSchema
>
> from pyflink.datastream import StreamExecutionEnvironment, WindowFunction,
> HashMapStateBackend, CheckpointingMode, \
>
>     FileSystemCheckpointStorage, KeyedProcessFunction, RuntimeContext,
> EmbeddedRocksDBStateBackend, RocksDBStateBackend, \
>
>     ExternalizedCheckpointCleanup
>
> from pyflink.datastream.connectors import FlinkKafkaConsumer,
> FlinkKafkaProducer
>
> from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig,
> ListStateDescriptor
>
> from sklearn.preprocessing import LabelEncoder
>
> import pickle
>
> import pandas as pd
>
> from pyflink.common import Row
>
>
>
> import argparse
>
> from typing import Iterable
>
>
>
> from pyflink.datastream.connectors import FileSink, OutputFileConfig,
> RollingPolicy
>
>
>
> from pyflink.common import Types, WatermarkStrategy, Time, Encoder
>
> from pyflink.common.watermark_strategy import TimestampAssigner
>
> from pyflink.datastream import StreamExecutionEnvironment,
> ProcessWindowFunction
>
> from pyflink.datastream.window import TumblingEventTimeWindows,
> TimeWindow, TumblingProcessingTimeWindows
>
>
>
>
>
> class MyTimestampAssigner(TimestampAssigner):
>
>     def extract_timestamp(self, value, record_timestamp) -> int:
>
>         return int(value[0])
>
>
>
>
>
> class CountWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):
>
>     def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):
>
>         # return [(key, result)]
>
>         return [(key, len([e for e in inputs]))]
>
>
>
>
>
> class Storage(KeyedProcessFunction):
>
>
>
>     def __init__(self):
>
>         self.state = None
>
>
>
>     def open(self, runtime_context: RuntimeContext):
>
>         state_descriptor = ValueStateDescriptor("state", Types.FLOAT())
>
>         state_ttl_config = StateTtlConfig \
>
>             .new_builder(Time.days(7)) \
>
>             .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
>
>             .disable_cleanup_in_background() \
>
>             .build()
>
>         state_descriptor.enable_time_to_live(state_ttl_config)
>
>         self.state = runtime_context.get_state(state_descriptor)
>
>
>
>     def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
>
>         # retrieve the current count
>
>         current = self.state.value()
>
>         if current is None:
>
>             current = 0
>
>         current = value[1]
>
>         self.state.update(current)
>
>
>
>         yield current,time.time()
>
>
>
>
>
> def write_to_kafka():
>
>     env = StreamExecutionEnvironment.get_execution_environment()
>
>     env.set_parallelism(1)
>
>     env.enable_checkpointing(10000)
>
>     env.get_checkpoint_config().set_min_pause_between_checkpoints(5000)
>
>
> env.set_state_backend(EmbeddedRocksDBStateBackend(enable_incremental_checkpointing=True))
>
>
> env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
>
>     env.get_checkpoint_config().enable_unaligned_checkpoints()
>
>     env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
>
>     env.get_checkpoint_config().set_checkpoint_timeout(50000)
>
>
> env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>
>     check = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
>                          'checkpoint-dir11')
>
>
> env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStorage("file:///{}".format(check)))
>
>     kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
>                              'flink-sql-connector-kafka_2.11-1.14.4.jar')
>
>     env.add_jars("file:///{}".format(kafka_jar))
>
>     rocksdb_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
>                              'flink-statebackend-rocksdb_2.11-1.0.0.jar')
>
>     env.add_jars("file:///{}".format(rocksdb_jar))
>
>     # deserialization_schema = SimpleStringSchema()
>
>     deserialization_schema = JsonRowDeserializationSchema.builder() \
>
>         .type_info(type_info=Types.ROW_NAMED(["time_stamp",
>
>                                               "Bill_number", "Store_Code",
> "itemdescription", "Item_code",
>
>                                               "Gross_Price", "Discount",
> "Net_Price",
>
>                                               "purchaseorReturn",
>
>                                               "Membership_No",
> "Billing_Date", "Billing_Time"],
>
>                                              [Types.DOUBLE(),
> Types.STRING(), Types.INT(), Types.STRING(),
>
>                                               Types.STRING(),
> Types.FLOAT(), Types.FLOAT(), Types.FLOAT(),
>
>                                               Types.STRING(),
> Types.STRING(), Types.STRING(),
>
>                                               Types.STRING()])).build()
>
>
>
>     kafka_consumer = FlinkKafkaConsumer(
>
>         topics='test',
>
>         deserialization_schema=deserialization_schema,
>
>         properties={'bootstrap.servers': '192.168.1.37:9092', 'group.id':
> 'test_group1'})
>
>
>
>     ds = env.add_source(kafka_consumer)
>
>     # ds.print()
>
>     watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
>
>         .with_timestamp_assigner(MyTimestampAssigner())
>
>     # ds = ds.map(lambda x: eval(x))
>
>
>
>     ds1 = ds.assign_timestamps_and_watermarks(watermark_strategy) \
>
>         .key_by(lambda x: '/root', key_type=Types.STRING()) \
>
>         .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) \
>
>         .apply(CountWindowFunction(), Types.TUPLE([Types.STRING(),
> Types.INT()]))
>
>     ds1.print()
>
>     ds4 = ds1.key_by(lambda value: value[0]) \
>
>         .process(Storage())
>
>     ds4.print()
>
>     # ds1 = ds.map(mlfunc)
>
>     # ds1.print()
>
>
>
>     env.execute('write_to_kafka')
>
>
>
>
>
> if __name__ == '__main__':
>
>     print("start writing data to kafka")
>
>     write_to_kafka()
>
>

Reply via email to