[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16819473#comment-16819473 ]
Prabhjot Singh Bharaj commented on SPARK-27409: ----------------------------------------------- [~gsomogyi] I haven't tried it on master. I'm facing the problem with Spark 2.3.2 Here is a complete log - {code:java} ➜ ~/spark ((HEAD detached at v2.3.2)) ✗ ./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 Python 2.7.10 (default, Feb 22 2019, 21:17:52) [GCC 4.2.1 Compatible Apple LLVM 10.0.1 (clang-1001.0.37.14)] on darwin Type "help", "copyright", "credits" or "license" for more information. Ivy Default Cache set to: /<REDACTED>/.ivy2/cache The jars for the packages stored in: /<REDACTED>/.ivy2/jars :: loading settings :: url = jar:file:/<REDACTED>/spark/assembly/target/scala-2.11/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-b75b99d4-ae39-49b0-b366-8b718542b4f8;1.0 confs: [default] found org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.2 in central found org.apache.kafka#kafka-clients;0.10.0.1 in local-m2-cache found net.jpountz.lz4#lz4;1.3.0 in local-m2-cache found org.xerial.snappy#snappy-java;1.1.2.6 in local-m2-cache found org.slf4j#slf4j-api;1.7.16 in spark-list found org.spark-project.spark#unused;1.0.0 in local-m2-cache :: resolution report :: resolve 1580ms :: artifacts dl 4ms :: modules in use: net.jpountz.lz4#lz4;1.3.0 from local-m2-cache in [default] org.apache.kafka#kafka-clients;0.10.0.1 from local-m2-cache in [default] org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.2 from central in [default] org.slf4j#slf4j-api;1.7.16 from spark-list in [default] org.spark-project.spark#unused;1.0.0 from local-m2-cache in [default] org.xerial.snappy#snappy-java;1.1.2.6 from local-m2-cache in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 6 | 2 | 2 | 0 || 6 | 0 | --------------------------------------------------------------------- :: problems summary :: :::: ERRORS unknown resolver null unknown resolver null :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS :: retrieving :: org.apache.spark#spark-submit-parent-b75b99d4-ae39-49b0-b366-8b718542b4f8 confs: [default] 0 artifacts copied, 6 already retrieved (0kB/6ms) 19/04/16 16:31:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.2 /_/ Using Python version 2.7.10 (default, Feb 22 2019 21:17:52) SparkSession available as 'spark'. >>> df = spark.readStream.format('kafka').option('kafka.bootstrap.servers', >>> 'localhost:9093').option("kafka.security.protocol", >>> "SSL",).option("kafka.ssl.keystore.password", >>> "").option("kafka.ssl.keystore.type", >>> "PKCS12").option("kafka.ssl.keystore.location", >>> 'non-existent').option('subscribe', 'no existing topic').load() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/<REDACTED>/spark/python/pyspark/sql/streaming.py", line 403, in load return self._df(self._jreader.load()) File "/<REDACTED>/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/<REDACTED>/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/<REDACTED>/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o37.load. : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540) at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78) at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory) at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623) ... 19 more Caused by: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory) at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:110) at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41) ... 22 more Caused by: java.io.FileNotFoundException: non-existent (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.<init>(FileInputStream.java:138) at java.io.FileInputStream.<init>(FileInputStream.java:93) at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:205) at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:190) at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:126) at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:108) ... 23 more{code} I'm running pyspark in the same way as mentioned in [https://spark.apache.org/docs/2.3.2/structured-streaming-kafka-integration.html#deploying] The problem is that when a `non-existent` cert is given, it seems to go through the `createContinuousReader` path, without having to specify a trigger. It should go via the Micro Batch code > Micro-batch support for Kafka Source in Spark 2.3 > ------------------------------------------------- > > Key: SPARK-27409 > URL: https://issues.apache.org/jira/browse/SPARK-27409 > Project: Spark > Issue Type: Question > Components: Structured Streaming > Affects Versions: 2.3.2 > Reporter: Prabhjot Singh Bharaj > Priority: Major > > It seems with this change - > [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] > in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in > micro-batch mode but only in continuous mode. Is that understanding correct ? > {code:java} > E Py4JJavaError: An error occurred while calling o217.load. > E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > E at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549) > E at > org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > E at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > E at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > E at java.lang.reflect.Method.invoke(Method.java:498) > E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > E at py4j.Gateway.invoke(Gateway.java:282) > E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > E at py4j.commands.CallCommand.execute(CallCommand.java:79) > E at py4j.GatewayConnection.run(GatewayConnection.java:238) > E at java.lang.Thread.run(Thread.java:748) > E Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: > non-existent (No such file or directory) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44) > E at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93) > E at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51) > E at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657) > E ... 19 more > E Caused by: org.apache.kafka.common.KafkaException: > java.io.FileNotFoundException: non-existent (No such file or directory) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41) > E ... 23 more > E Caused by: java.io.FileNotFoundException: non-existent (No such file or > directory) > E at java.io.FileInputStream.open0(Native Method) > E at java.io.FileInputStream.open(FileInputStream.java:195) > E at java.io.FileInputStream.<init>(FileInputStream.java:138) > E at java.io.FileInputStream.<init>(FileInputStream.java:93) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201) > E at > org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119) > E ... 24 more{code} > When running a simple data stream loader for kafka without an SSL cert, it > goes through this code block - > > {code:java} > ... > ... > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > ... > ...{code} > > Note that I haven't selected `trigger=continuous...` when creating the > dataframe, still the code is going through the continuous path. My > understanding was that `continuous` is optional and not the default. > > Please clarify. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org