Hi,

>From the task manager's log, We can find the following exception stack
trace, seems it was the operating system related problem with rocksDB.

2022-06-04 14:45:53,878 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - KEYED
> PROCESS, Map -> Sink: Print to Std. Out (1/1)
> (3a295a887361f4f87d2eaf79e901d056) switched from INITIALIZING to FAILED on
> 2f04093f-a18f-4c2c-b02e-6d1e6ece677e @ kubernetes.docker.internal
> (dataPort=-1).
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_202]
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> PythonKeyedProcessOperator_ce1331762e4644ef89dbdbc15321f049_(1/1) from any
> of the 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> ... 11 more
> Caused by: java.io.IOException: Could not load the native RocksDB library
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(EmbeddedRocksDBStateBackend.java:882)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:402)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:93)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> ... 11 more
> Caused by: java.lang.UnsatisfiedLinkError:
> C:\Users\Admin\AppData\Local\Temp\rocksdb-lib-cfe23cb49ce486a76d1126db2517e679\librocksdbjni-win64.dll:
> A dynamic link library (DLL) initialization routine failed
> at java.lang.ClassLoader$NativeLibrary.load(Native Method) ~[?:1.8.0_202]
> at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941) ~[?:1.8.0_202]
> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824) ~[?:1.8.0_202]
> at java.lang.Runtime.load0(Runtime.java:809) ~[?:1.8.0_202]
> at java.lang.System.load(System.java:1086) ~[?:1.8.0_202]
> at
> org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:79)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:57)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(EmbeddedRocksDBStateBackend.java:856)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:402)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:93)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> ... 11 more
>

harshit.varsh...@iktara.ai <harshit.varsh...@iktara.ai> 于2022年6月9日周四
15:06写道:

>
>
>
>
> *From:* Dian Fu [mailto:dian0511...@gmail.com]
> *Sent:* Monday, June 6, 2022 6:47 AM
> *To:* harshit.varsh...@iktara.ai
> *Cc:* user
> *Subject:* Re: FW: Issue Facing While Using EmbeddedRocksDbCheckpointing
> FlinkVersion(1.15.0)
>
>
>
> None of the attachments are logs of the TaskManger. The TaskManger log
> should be located in the directory
> `E:\pythonProject16\lib\site-packages\pyflink\log`.
>
>
>
> On Fri, Jun 3, 2022 at 8:41 PM harshit.varsh...@iktara.ai <
> harshit.varsh...@iktara.ai> wrote:
>
>
>
>
>
> *From:* Shuiqiang Chen [mailto:acqua....@gmail.com]
> *Sent:* Friday, June 3, 2022 2:11 PM
> *To:* harshit.varsh...@iktara.ai
> *Cc:* user
> *Subject:* Re: Issue Facing While Using EmbeddedRocksDbCheckpointing
> FlinkVersion(1.15.0)
>
>
>
> Hi,
>
>
>
> I guess that the traceback log you provided might not be the root cause of
> the failure, could you please provide the complete log of the Taskmanager?
>
>
>
> Best,
>
> Shuiqiang
>
>
>
> harshit.varsh...@iktara.ai <harshit.varsh...@iktara.ai> 于2022年6月2日周四 22:04
> 写道:
>
> Dear Team,
>
>
>
> I am new to pyflink and request for your support in issue I am facing with
> Pyflink. I am using Pyflink version 1.15.0 & using reference code from
> pyflink reference code.
>
> The errors I am getting
>
> Traceback (most recent call last):
>
>   File
> "E:\pythonProject16\lib\site-packages\apache_beam\runners\worker\data_plane.py",
> line 470, in input_elements
>
>     element = received.get(timeout=1)
>
>   File
> "C:\Users\Admin\AppData\Local\Programs\Python\Python38\lib\queue.py", line
> 178, in get
>
>     raise Empty
>
> _queue.Empty
>
> RuntimeError: Channel closed prematurely.
>
> My code is:
>
> import json
>
> import os
>
> import time
>
> from datetime import datetime
>
>
>
> from pyflink.common import SimpleStringSchema,
> JsonRowDeserializationSchema, Types, JsonRowSerializationSchema
>
> from pyflink.datastream import StreamExecutionEnvironment, WindowFunction,
> HashMapStateBackend, CheckpointingMode, \
>
>     FileSystemCheckpointStorage, KeyedProcessFunction, RuntimeContext,
> EmbeddedRocksDBStateBackend, RocksDBStateBackend, \
>
>     ExternalizedCheckpointCleanup
>
> from pyflink.datastream.connectors import FlinkKafkaConsumer,
> FlinkKafkaProducer
>
> from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig,
> ListStateDescriptor
>
> from sklearn.preprocessing import LabelEncoder
>
> import pickle
>
> import pandas as pd
>
> from pyflink.common import Row
>
>
>
> import argparse
>
> from typing import Iterable
>
>
>
> from pyflink.datastream.connectors import FileSink, OutputFileConfig,
> RollingPolicy
>
>
>
> from pyflink.common import Types, WatermarkStrategy, Time, Encoder
>
> from pyflink.common.watermark_strategy import TimestampAssigner
>
> from pyflink.datastream import StreamExecutionEnvironment,
> ProcessWindowFunction
>
> from pyflink.datastream.window import TumblingEventTimeWindows,
> TimeWindow, TumblingProcessingTimeWindows
>
>
>
>
>
> class MyTimestampAssigner(TimestampAssigner):
>
>     def extract_timestamp(self, value, record_timestamp) -> int:
>
>         return int(value[0])
>
>
>
>
>
> class CountWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):
>
>     def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):
>
>         # return [(key, result)]
>
>         return [(key, len([e for e in inputs]))]
>
>
>
>
>
> class Storage(KeyedProcessFunction):
>
>
>
>     def __init__(self):
>
>         self.state = None
>
>
>
>     def open(self, runtime_context: RuntimeContext):
>
>         state_descriptor = ValueStateDescriptor("state", Types.FLOAT())
>
>         state_ttl_config = StateTtlConfig \
>
>             .new_builder(Time.days(7)) \
>
>             .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
>
>             .disable_cleanup_in_background() \
>
>             .build()
>
>         state_descriptor.enable_time_to_live(state_ttl_config)
>
>         self.state = runtime_context.get_state(state_descriptor)
>
>
>
>     def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
>
>         # retrieve the current count
>
>         current = self.state.value()
>
>         if current is None:
>
>             current = 0
>
>         current = value[1]
>
>         self.state.update(current)
>
>
>
>         yield current,time.time()
>
>
>
>
>
> def write_to_kafka():
>
>     env = StreamExecutionEnvironment.get_execution_environment()
>
>     env.set_parallelism(1)
>
>     env.enable_checkpointing(10000)
>
>     env.get_checkpoint_config().set_min_pause_between_checkpoints(5000)
>
>
> env.set_state_backend(EmbeddedRocksDBStateBackend(enable_incremental_checkpointing=True))
>
>
> env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
>
>     env.get_checkpoint_config().enable_unaligned_checkpoints()
>
>     env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
>
>     env.get_checkpoint_config().set_checkpoint_timeout(50000)
>
>
> env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>
>     check = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
>                          'checkpoint-dir11')
>
>
> env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStorage("
> file:///{}".format(check)))
>
>     kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
>                              'flink-sql-connector-kafka_2.11-1.14.4.jar')
>
>     env.add_jars("file:///{}".format(kafka_jar))
>
>     rocksdb_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
>                              'flink-statebackend-rocksdb_2.11-1.0.0.jar')
>
>     env.add_jars("file:///{}".format(rocksdb_jar))
>
>     # deserialization_schema = SimpleStringSchema()
>
>     deserialization_schema = JsonRowDeserializationSchema.builder() \
>
>         .type_info(type_info=Types.ROW_NAMED(["time_stamp",
>
>                                               "Bill_number", "Store_Code",
> "itemdescription", "Item_code",
>
>                                               "Gross_Price", "Discount",
> "Net_Price",
>
>                                               "purchaseorReturn",
>
>                                               "Membership_No",
> "Billing_Date", "Billing_Time"],
>
>                                              [Types.DOUBLE(),
> Types.STRING(), Types.INT(), Types.STRING(),
>
>                                               Types.STRING(),
> Types.FLOAT(), Types.FLOAT(), Types.FLOAT(),
>
>                                               Types.STRING(),
> Types.STRING(), Types.STRING(),
>
>                                               Types.STRING()])).build()
>
>
>
>     kafka_consumer = FlinkKafkaConsumer(
>
>         topics='test',
>
>         deserialization_schema=deserialization_schema,
>
>         properties={'bootstrap.servers': '192.168.1.37:9092', 'group.id':
> 'test_group1'})
>
>
>
>     ds = env.add_source(kafka_consumer)
>
>     # ds.print()
>
>     watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
>
>         .with_timestamp_assigner(MyTimestampAssigner())
>
>     # ds = ds.map(lambda x: eval(x))
>
>
>
>     ds1 = ds.assign_timestamps_and_watermarks(watermark_strategy) \
>
>         .key_by(lambda x: '/root', key_type=Types.STRING()) \
>
>         .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) \
>
>         .apply(CountWindowFunction(), Types.TUPLE([Types.STRING(),
> Types.INT()]))
>
>     ds1.print()
>
>     ds4 = ds1.key_by(lambda value: value[0]) \
>
>         .process(Storage())
>
>     ds4.print()
>
>     # ds1 = ds.map(mlfunc)
>
>     # ds1.print()
>
>
>
>     env.execute('write_to_kafka')
>
>
>
>
>
> if __name__ == '__main__':
>
>     print("start writing data to kafka")
>
>     write_to_kafka()
>
>

Reply via email to