PyFlink Expected IPC message of type schema but got record batch

2021-01-30 文章 苗红宾
Hi:

Hope you are good! I have a question for pyflink, details as below:

Feature: Windows of size 10 minutes that slides by 5 minutes for data 
aggregate, then do something, almost 2GB data per window, 1 million data items.

Job params:

bin/yarn-session.sh -s 2 -jm 2048 -tm 48768 \
-Dyarn.containers.vcores=4 \
-Dtaskmanager.memory.managed.consumer-weights=DATAPROC:30,PYTHON:70 \
-Dtaskmanager.memory.managed.fraction=0.7 \
-Dtaskmanager.memory.task.off-heap.size=5120m \
-nm $task_name -qu $queue -d


Exception msg as below:

Traceback (most recent call last):
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 253, in _execute
response = task()
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 310, in 
lambda: self.create_worker().do_instruction(request), request)
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 480, in do_instruction
getattr(request, request_type), request.instruction_id)
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 515, in process_bundle
bundle_processor.process_bundle(instruction_id))
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 978, in process_bundle
element.data)
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 218, in process_encoded
self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 330, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 332, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 195, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in 
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 73, in 
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
 line 627, in decode_from_stream
yield self._decode_one_batch_from_stream(in_stream, 
in_stream.read_var_int64())
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
 line 638, in _decode_one_batch_from_stream
return arrow_to_pandas(self._timezone, self._field_types, 
[next(self._batch_reader)])
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
 line 631, in _load_from_stream
reader = pa.ipc.open_stream(stream)
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyarrow/ipc.py",
 line 137, in open_stream
return RecordBatchStreamReader(source)
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyarrow/ipc.py",
 line 61, in __init_

Re: 检查点成功,但从检查点恢复失败。使用了guava的bloomFilter,有人帮忙分析下吗?

2021-01-30 文章 Kezhu Wang
自定义 state  的 type-serializer,可以尝试下 SimpleVersionedSerializer/avro/protobuf,
etc.

复杂状态应该尽量避免使用依赖反射的 serializer,java Serializable 同理,这些都不好做 state upgrade。

On January 31, 2021 at 11:29:25, 赵一旦 (hinobl...@gmail.com) wrote:

这个问题有人知道吗?
我知道原因了,应该是LongAdder这个类非public,但不知道除了覆盖这个类还有什么解决方案吗?从Flink层面有对这种情况的解决吗。
毕竟需要用到的类不一定是用户自定义的,就无法保证一定符合某些规则。比如这里的Guava中的LongAdder非public的问题,我目前覆盖了这个类是可以解决的。不清楚有没有其他解决方式。


赵一旦  于2021年1月28日周四 下午6:03写道:

> 如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。
> 报错堆栈如下,关键错误是什么无法访问public修饰的成员?
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:235)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:248)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .initializeStateAndOpenOperators(OperatorChain.java:400)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$beforeInvoke$2(StreamTask.java:507)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> .runThrowing(StreamTaskActionExecutor.java:47)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:501)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:531)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
KeyedProcessOperator_efc74365c4197a3ac1978016263fc7e7_(5
> /30) from any of the 1 provided restore options.
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:135)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.keyedStatedBackend(
> StreamTaskStateInitializerImpl.java:316)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:155)
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
Failed
> when trying to restore heap backend
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> .build(HeapKeyedStateBackendBuilder.java:116)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:540)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:100)
> at org.apache.flink.runtime.state.StateBackend
> .createKeyedStateBackend(StateBackend.java:178)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
> StreamTaskStateInitializerImpl.java:299)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:121)
> ... 11 more
> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
> instance of class: com.google.common.hash.LongAdder
> Serialization trace:
> bitCount (com.google.common.hash.BloomFilterStrategies$LockFreeBitArray)
> bits (com.google.common.hash.BloomFilter)
> at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
> 136)
> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.create(
> FieldSerializer.java:547)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:523)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:113)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:143)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .deserialize(KryoSerializer.java:346)
> at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders
> .lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
> at org.apache.flink.runtime.state.
> KeyGroupPartitioner$PartitioningResultKeyGroupReader
> .readMappingsInKeyGroup(KeyGroupPartitioner.java:29

Re: 咨询求助

2021-01-30 文章 Shengkai Fang
hi,

根据文档[1][2], 你可以通过在with参数内填相应的内容来通过认证, e.g

'properties.sasl.kerberos.service.name' = 'xyz'

[1]
https://kafka.apache.org/documentation/#brokerconfigs_sasl.kerberos.service.name
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#properties

瞿叶奇 <389243...@qq.com> 于2021年1月31日周日 下午2:26写道:

> 您好,我是国网陕西采集系统开发人员,我们在架构改造中,准备使用pyflink
> 解决实时Kafka数据写HDFS的问题,我的Kafka集群存在kerberos安全认证,导致我现在还没连接上,能不能给一个样例呢?


是否可以 hive 流 join hive 流?

2021-01-30 文章 macdoor
具体需求是这样,采集取得的通道总流量5分钟一次存入 hive 表,为了取得 5 分钟内该通道的流量,需要前后2次采集到的总流量相减,我想用同一个 hive
表自己相互 join,形成 2 个 hive 流 join,不知道是否可以实现?或者有其他实现方法吗?
我现在使用 crontab 定时 batch 模式做,希望能改成 stream 模式

select p1.traffic -p2.traffic
from p as p1
inner join p as p2 on p1.id=p2.id and p1.time=p2.time + interval 5 minutes



--
Sent from: http://apache-flink.147419.n8.nabble.com/

????????

2021-01-30 文章 ??????
??pyflink 
Kafka??HDFSKafkakerberos

????pyflink??????kerberos??????kafka??????

2021-01-30 文章 ??????
??csvkafka
#!/usr/bin/python3.7
# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, 
CsvTableSink, WriteMode, SqlDialect
from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
s_env.enable_checkpointing(3000)
st_env = StreamTableEnvironment.create(s_env, TableConfig())
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")
st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect",
 
"192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol",
 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", 
'kafka').property("kerberos.domain.name", 
'hadoop.hadoop.com').property("bootstrap.servers", 
"192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id",
 DataTypes.BIGINT()),DataTypes.FIELD("name", 
DataTypes.STRING())]))).with_schema(Schema().field("id", 
DataTypes.BIGINT()).field("name", 
DataTypes.STRING())).register_table_source("sourceKafka")
fieldNames = ["id", "name"]
fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, 
WriteMode.OVERWRITE)
st_env.register_table_sink("csvTableSink", csvSink)
resultQuery = st_env.sql_query("select id,name from sourceKafka")
resultQuery = resultQuery.insert_into("csvTableSink")
st_env.execute("pyflink-kafka-v2")

Re: 检查点成功,但从检查点恢复失败。使用了guava的bloomFilter,有人帮忙分析下吗?

2021-01-30 文章 赵一旦
这个问题有人知道吗?
我知道原因了,应该是LongAdder这个类非public,但不知道除了覆盖这个类还有什么解决方案吗?从Flink层面有对这种情况的解决吗。
毕竟需要用到的类不一定是用户自定义的,就无法保证一定符合某些规则。比如这里的Guava中的LongAdder非public的问题,我目前覆盖了这个类是可以解决的。不清楚有没有其他解决方式。

赵一旦  于2021年1月28日周四 下午6:03写道:

> 如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。
> 报错堆栈如下,关键错误是什么无法访问public修饰的成员?
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:235)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:248)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .initializeStateAndOpenOperators(OperatorChain.java:400)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$beforeInvoke$2(StreamTask.java:507)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> .runThrowing(StreamTaskActionExecutor.java:47)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:501)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:531)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for KeyedProcessOperator_efc74365c4197a3ac1978016263fc7e7_(5
> /30) from any of the 1 provided restore options.
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:135)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.keyedStatedBackend(
> StreamTaskStateInitializerImpl.java:316)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:155)
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore heap backend
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> .build(HeapKeyedStateBackendBuilder.java:116)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:540)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:100)
> at org.apache.flink.runtime.state.StateBackend
> .createKeyedStateBackend(StateBackend.java:178)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
> StreamTaskStateInitializerImpl.java:299)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:121)
> ... 11 more
> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
> instance of class: com.google.common.hash.LongAdder
> Serialization trace:
> bitCount (com.google.common.hash.BloomFilterStrategies$LockFreeBitArray)
> bits (com.google.common.hash.BloomFilter)
> at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
> 136)
> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.create(
> FieldSerializer.java:547)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:523)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:113)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:143)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .deserialize(KryoSerializer.java:346)
> at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders
> .lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
> at org.apache.flink.runtime.state.
> KeyGroupPartitioner$PartitioningResultKeyGroupReader
> .readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> at org.apache.flink.runtime.state.heap.HeapRestoreOperatio

Re: Catalog(Kafka Connectors 的ddl)持久化到hive metastore,groupid一样的问题

2021-01-30 文章 JasonLee
hi

社区以及提供了动态修改表属性的功能,具体使用可以参考 https://mp.weixin.qq.com/s/nWKVGmAtENlQ80mdETZzDw



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于端到端的延迟监控

2021-01-30 文章 JasonLee
hi

可以在消费 kafka 的时候获取到数据进入 kafka 的时间戳 然后在最终的 sink 的时候再获取一个时间戳 自己定义一个 metric
上报最终的耗时



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-30 文章 JasonLee
hi 

watermark 是要根据数据里面的时间戳生成的 所有分区都没有数据的情况下 为什么还要 watermark 推进呢?



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Pyflink对接kerberos认证的Kafka的问题求助

2021-01-30 文章 Appleyuchi
你加下我微信appleyuchi吧 
我帮你整理下这个问题










在 2021-01-30 17:53:00,"瞿叶奇" <389243...@qq.com> 写道:

老师,
您好!我在使用pyflink对接kafka时收到的错误信息如下:

我的flink-conf.yaml已经配置了应有的信息,且我的kerberos 账户是有效的



我的jaas.conf文件如下:


请问我该如何解决呢?目前卡在这里没有任何进展。

pyflink gives the error message "Could not find a'KafkaClient' entry in the JAAS configuration. System property'java.security.auth.login.config' is not set"

2021-01-30 文章 ??????
  Hello! The error message I received when using pyflink to connect to 
Kafka is as follows:

  
My flink-conf.yaml has been configured with the necessary information, and my 
kerberos account is valid, as shown below??




My jaas.conf file is as follows:

I have also tried adding parameters, but it has not been resolved??Like below??
pyflink-shell.sh local  
-Djava.security.auth.login.config=/opt/client2/Flink/flink/conf/jaas.conf
How can I solve it? No progress is currently stuck here.

????: Pyflink????kerberos??????Kafka??????????

2021-01-30 文章 Evan
??  ??


 ??
?? 2021-01-30 17:53
 user-zh
?? Pyflinkkerberos??Kafka??
??
??pyflinkkafka??

flink-conf.yamlkerberos 


jaas.conf??

??


Pyflink????kerberos??????Kafka??????????

2021-01-30 文章 ??????
??
??pyflinkkafka??

flink-conf.yamlkerberos 




jaas.conf??


??

Re:求一份可以编译flink源码的settings.xml文件

2021-01-30 文章 Appleyuchi












那是因为不存在这个jar
查看下这些jar的版本





在 2021-01-30 16:38:54,"徐州州" <25977...@qq.com> 写道:
>最近一直在编译最新版的flink总是遇到几个jar通过maven下载不到,求一份setting.xml文件。


??????????????flink??????settings.xml????

2021-01-30 文章 ??????
??flinkjarmavensetting.xml??

Re: 问题求助(Pyflink)

2021-01-30 文章 Shuiqiang Chen
抱歉,漏了文档链接
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/kafka.html#enabling-kerberos-authentication

Shuiqiang Chen  于2021年1月30日周六 下午4:32写道:

> Hi,
> 按照这个文档, 在flink-conf.yaml里配置了security.kerberos.login.keytab 和 
> security.kerberos.login.principal这两个属性了吗?
> 还有就是jaas.conf文件在各个task manager所在的机器都能访问到吗?
>
> 瞿叶奇 <389243...@qq.com> 于2021年1月30日周六 下午4:15写道:
>
>> 老师,您好,
>> 1)报错信息在附件内,flink-root-python-node-master1aSdM.log文件。报错信息如下:
>> Caused by: java.lang.IllegalArgumentException: Could not find a
>> 'KafkaClient' entry in the JAAS configuration. System property
>> 'java.security.auth.login.config' is not set  at
>> org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
>> 2)这是Flink配置里的jaas.conf
>> 3)pyspark对接kafka写hdfs没有问题,这是spark的jaas.conf
>> 3.1)这是spark配置里的jaas.conf
>>
>> 3.2)这是spark配置里的jaas-zk.conf
>>
>>
>> 是不是要修改这个文件呢?百度给出的都是java的修正,不知道这个需要如何修改。
>>
>>
>> -- 原始邮件 --
>> *发件人:* "user-zh" ;
>> *发送时间:* 2021年1月30日(星期六) 下午3:49
>> *收件人:* "user-zh";
>> *主题:* Re: 问题求助(Pyflink)
>>
>> 你好,
>> 可以看下source task所在task manager 的日志里看看 consumer 有没有成功获取到kafka
>> partition相关meta信息和认证相关是否成功的信息。
>>
>> 瞿叶奇 <389243...@qq.com> 于2021年1月30日周六 下午3:14写道:
>>
>> > 老师,你好,消费是没有任何问题,可以正常消费。
>> >
>> >
>> >
>> >
>> > -- 原始邮件 --
>> > *发件人:* "user-zh" ;
>> > *发送时间:* 2021年1月30日(星期六) 下午3:08
>> > *收件人:* "user-zh";
>> > *主题:* Re:问题求助(Pyflink)
>> >
>> > 先看下kafka能否通过命令行消费数据.
>> >
>> > 命令行检查确保能消费,再使用Flink.
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > 在 2021-01-30 14:25:57,"瞿叶奇" <389243...@qq.com> 写道:
>> >
>> >
>> 老师,您好,我想通过Flink消费kafka写本地csv文件,目前遇到的问题是Flink、Kafka都是kerberos认证的集群,而且我是用的是pyflink,现在程序能执行,不报错,但是不消费数据,csv文件没有结果,但是文件日期修改日期一直在更新。怀疑对接kafka
>> > 存在问题,希望老师能够给解决疑惑。
>> > 1)Kafka生产数据:
>> >
>> > 2)pyflink 程序
>> >
>> >
>> > #!/usr/bin/python3.7
>> > # -*- coding: UTF-8 -*-
>> > from pyflink.datastream import StreamExecutionEnvironment,
>> > CheckpointingMode
>> > from pyflink.table import StreamTableEnvironment, TableConfig,
>> DataTypes,
>> > CsvTableSink, WriteMode, SqlDialect
>> > from pyflink.table.descriptors import
>> FileSystem,OldCsv,Schema,Kafka,Json
>> > s_env = StreamExecutionEnvironment.get_execution_environment()
>> > s_env.set_parallelism(1)
>> > s_env.enable_checkpointing(3000)
>> > st_env = StreamTableEnvironment.create(s_env, TableConfig())
>> > st_env.use_catalog("default_catalog")
>> > st_env.use_database("default_database")
>> >
>> st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect",
>> > "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002
>> ").property("security.protocol",
>> > 'SASL_PLAINTEXT').property("sasl.kerberos.service.name",
>> > 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com
>> ').property("bootstrap.servers",
>> > "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007
>> ")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id",
>> > DataTypes.BIGINT()),DataTypes.FIELD("name",
>> > DataTypes.STRING())]))).with_schema(Schema().field("id",
>> > DataTypes.BIGINT()).field("name",
>> > DataTypes.STRING())).register_table_source("sourceKafka")
>> > fieldNames = ["id", "name"]
>> > fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
>> > csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv",
>> ",",
>> > 1, WriteMode.OVERWRITE)
>> > st_env.register_table_sink("csvTableSink", csvSink)
>> > resultQuery = st_env.sql_query("select id,name from sourceKafka")
>> > resultQuery = resultQuery.insert_into("csvTableSink")
>> > st_env.execute("pyflink-kafka-v2")
>> > 3)pyflink-shell.sh local
>> >
>> > 4)运行结果
>> > 在pyflink-shell local运行程序的同时,启用生产者生产数据,查看结果文件如下:
>> >
>> >
>> >
>> 可以看出文件确实在被更新,文件的修改时间在变化,但是里面是空的,一方面希望pyflink可以增加打印到控制台的功能,一方面希望老师能给出对接kerberos认证的kafka的案例,我是陕西国网用电信息采集系统新架构改造的工程师,我们计划使用flink测试kafka-hdfs数据的数据传输。希望老师能给出一个案例,帮助我们完成测试。
>> >
>> >
>> >
>> >
>>
>


Re: 问题求助(Pyflink)

2021-01-30 文章 Shuiqiang Chen
Hi,
按照这个文档, 在flink-conf.yaml里配置了security.kerberos.login.keytab 和
security.kerberos.login.principal这两个属性了吗?
还有就是jaas.conf文件在各个task manager所在的机器都能访问到吗?

瞿叶奇 <389243...@qq.com> 于2021年1月30日周六 下午4:15写道:

> 老师,您好,
> 1)报错信息在附件内,flink-root-python-node-master1aSdM.log文件。报错信息如下:
> Caused by: java.lang.IllegalArgumentException: Could not find a
> 'KafkaClient' entry in the JAAS configuration. System property
> 'java.security.auth.login.config' is not set  at
> org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
> 2)这是Flink配置里的jaas.conf
> 3)pyspark对接kafka写hdfs没有问题,这是spark的jaas.conf
> 3.1)这是spark配置里的jaas.conf
>
> 3.2)这是spark配置里的jaas-zk.conf
>
>
> 是不是要修改这个文件呢?百度给出的都是java的修正,不知道这个需要如何修改。
>
>
> -- 原始邮件 --
> *发件人:* "user-zh" ;
> *发送时间:* 2021年1月30日(星期六) 下午3:49
> *收件人:* "user-zh";
> *主题:* Re: 问题求助(Pyflink)
>
> 你好,
> 可以看下source task所在task manager 的日志里看看 consumer 有没有成功获取到kafka
> partition相关meta信息和认证相关是否成功的信息。
>
> 瞿叶奇 <389243...@qq.com> 于2021年1月30日周六 下午3:14写道:
>
> > 老师,你好,消费是没有任何问题,可以正常消费。
> >
> >
> >
> >
> > -- 原始邮件 --
> > *发件人:* "user-zh" ;
> > *发送时间:* 2021年1月30日(星期六) 下午3:08
> > *收件人:* "user-zh";
> > *主题:* Re:问题求助(Pyflink)
> >
> > 先看下kafka能否通过命令行消费数据.
> >
> > 命令行检查确保能消费,再使用Flink.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2021-01-30 14:25:57,"瞿叶奇" <389243...@qq.com> 写道:
> >
> >
> 老师,您好,我想通过Flink消费kafka写本地csv文件,目前遇到的问题是Flink、Kafka都是kerberos认证的集群,而且我是用的是pyflink,现在程序能执行,不报错,但是不消费数据,csv文件没有结果,但是文件日期修改日期一直在更新。怀疑对接kafka
> > 存在问题,希望老师能够给解决疑惑。
> > 1)Kafka生产数据:
> >
> > 2)pyflink 程序
> >
> >
> > #!/usr/bin/python3.7
> > # -*- coding: UTF-8 -*-
> > from pyflink.datastream import StreamExecutionEnvironment,
> > CheckpointingMode
> > from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes,
> > CsvTableSink, WriteMode, SqlDialect
> > from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json
> > s_env = StreamExecutionEnvironment.get_execution_environment()
> > s_env.set_parallelism(1)
> > s_env.enable_checkpointing(3000)
> > st_env = StreamTableEnvironment.create(s_env, TableConfig())
> > st_env.use_catalog("default_catalog")
> > st_env.use_database("default_database")
> >
> st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect",
> > "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002
> ").property("security.protocol",
> > 'SASL_PLAINTEXT').property("sasl.kerberos.service.name",
> > 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com
> ').property("bootstrap.servers",
> > "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007
> ")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id",
> > DataTypes.BIGINT()),DataTypes.FIELD("name",
> > DataTypes.STRING())]))).with_schema(Schema().field("id",
> > DataTypes.BIGINT()).field("name",
> > DataTypes.STRING())).register_table_source("sourceKafka")
> > fieldNames = ["id", "name"]
> > fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
> > csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv",
> ",",
> > 1, WriteMode.OVERWRITE)
> > st_env.register_table_sink("csvTableSink", csvSink)
> > resultQuery = st_env.sql_query("select id,name from sourceKafka")
> > resultQuery = resultQuery.insert_into("csvTableSink")
> > st_env.execute("pyflink-kafka-v2")
> > 3)pyflink-shell.sh local
> >
> > 4)运行结果
> > 在pyflink-shell local运行程序的同时,启用生产者生产数据,查看结果文件如下:
> >
> >
> >
> 可以看出文件确实在被更新,文件的修改时间在变化,但是里面是空的,一方面希望pyflink可以增加打印到控制台的功能,一方面希望老师能给出对接kerberos认证的kafka的案例,我是陕西国网用电信息采集系统新架构改造的工程师,我们计划使用flink测试kafka-hdfs数据的数据传输。希望老师能给出一个案例,帮助我们完成测试。
> >
> >
> >
> >
>