[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815491#comment-16815491 ]
Prabhjot Singh Bharaj commented on SPARK-27409: ----------------------------------------------- {code:java} In [5]: df = sc.sql.readStream.format('kafka').option('kafka.bootstrap.servers', 'localhost:9093').option("kafka.security.protocol", "SSL",)\ ...: .option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", 'non-existent').option('subscribe', 'no existing topic').load() 2019-04-11 15:04:49.903 INFO ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:9093] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = spark-kafka-source-a944d4a9-0257-492a-88b4-bba9decebb28-876181411-driver-0 heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 1 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = SSL send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = non-existent ssl.keystore.password = [hidden] ssl.keystore.type = PKCS12 ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) /<redacted>/ in <module>() ----> 1 df = sc.sql.readStream.format('kafka').option('kafka.bootstrap.servers', 'localhost:9093').option("kafka.security.protocol", "SSL",).option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", 'non-existent').option('subscribe', 'no existing topic').load() /<redacted>/python/pyspark/sql/streaming.pyc in load(self, path, format, schema, **options) 401 return self._df(self._jreader.load(path)) 402 else: --> 403 return self._df(self._jreader.load()) 404 405 @since(2.0) /<redacted>/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args: /<redacted>/python/pyspark/sql/utils.pyc in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /<redacted>/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( Py4JJavaError: An error occurred while calling o147.load. : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549) at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78) at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory) at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93) at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657) ... 19 more Caused by: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory) at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121) at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41) ... 23 more Caused by: java.io.FileNotFoundException: non-existent (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.<init>(FileInputStream.java:138) at java.io.FileInputStream.<init>(FileInputStream.java:93) at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216) at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201) at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137) at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119) ... 24 more{code} Going through these logs, notice the calls to `createContinuousReader`. This seems like a bug or something thats not documented. My understanding was that it should go to the `createMicroBatchReader` but its not. ` > Micro-batch support for Kafka Source in Spark 2.3 > ------------------------------------------------- > > Key: SPARK-27409 > URL: https://issues.apache.org/jira/browse/SPARK-27409 > Project: Spark > Issue Type: Question > Components: Structured Streaming > Affects Versions: 2.3.2 > Reporter: Prabhjot Singh Bharaj > Priority: Major > > It seems with this change - > [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] > in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in > micro-batch mode but only in continuous mode. Is that understanding correct ? > {code:java} > E Py4JJavaError: An error occurred while calling o217.load. > E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > E at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549) > E at > org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > E at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > E at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > E at java.lang.reflect.Method.invoke(Method.java:498) > E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > E at py4j.Gateway.invoke(Gateway.java:282) > E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > E at py4j.commands.CallCommand.execute(CallCommand.java:79) > E at py4j.GatewayConnection.run(GatewayConnection.java:238) > E at java.lang.Thread.run(Thread.java:748) > E Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: > non-existent (No such file or directory) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44) > E at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93) > E at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51) > E at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657) > E ... 19 more > E Caused by: org.apache.kafka.common.KafkaException: > java.io.FileNotFoundException: non-existent (No such file or directory) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41) > E ... 23 more > E Caused by: java.io.FileNotFoundException: non-existent (No such file or > directory) > E at java.io.FileInputStream.open0(Native Method) > E at java.io.FileInputStream.open(FileInputStream.java:195) > E at java.io.FileInputStream.<init>(FileInputStream.java:138) > E at java.io.FileInputStream.<init>(FileInputStream.java:93) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201) > E at > org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119) > E ... 24 more{code} > When running a simple data stream loader for kafka without an SSL cert, it > goes through this code block - > > {code:java} > ... > ... > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > ... > ...{code} > > Note that I haven't selected `trigger=continuous...` when creating the > dataframe, still the code is going through the continuous path. My > understanding was that `continuous` is optional and not the default. > > Please clarify. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org