[ 
https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16819809#comment-16819809
 ] 

Gabor Somogyi commented on SPARK-27409:
---------------------------------------

I mean does this cause any data processing issue other than the stack?

> Micro-batch support for Kafka Source in Spark 2.3
> -------------------------------------------------
>
>                 Key: SPARK-27409
>                 URL: https://issues.apache.org/jira/browse/SPARK-27409
>             Project: Spark
>          Issue Type: Question
>          Components: Structured Streaming
>    Affects Versions: 2.3.2
>            Reporter: Prabhjot Singh Bharaj
>            Priority: Major
>
> It seems with this change - 
> [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50]
>  in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in 
> micro-batch mode but only in continuous mode. Is that understanding correct ?
> {code:java}
> E Py4JJavaError: An error occurred while calling o217.load.
> E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> E at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
> E at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
> E at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549)
> E at 
> org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
> E at 
> org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
> E at 
> org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
> E at 
> org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
> E at 
> org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
> E at 
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
> E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> E at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> E at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> E at java.lang.reflect.Method.invoke(Method.java:498)
> E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> E at py4j.Gateway.invoke(Gateway.java:282)
> E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> E at py4j.commands.CallCommand.execute(CallCommand.java:79)
> E at py4j.GatewayConnection.run(GatewayConnection.java:238)
> E at java.lang.Thread.run(Thread.java:748)
> E Caused by: org.apache.kafka.common.KafkaException: 
> org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: 
> non-existent (No such file or directory)
> E at 
> org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44)
> E at 
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
> E at 
> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
> E at 
> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
> E at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657)
> E ... 19 more
> E Caused by: org.apache.kafka.common.KafkaException: 
> java.io.FileNotFoundException: non-existent (No such file or directory)
> E at 
> org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121)
> E at 
> org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41)
> E ... 23 more
> E Caused by: java.io.FileNotFoundException: non-existent (No such file or 
> directory)
> E at java.io.FileInputStream.open0(Native Method)
> E at java.io.FileInputStream.open(FileInputStream.java:195)
> E at java.io.FileInputStream.<init>(FileInputStream.java:138)
> E at java.io.FileInputStream.<init>(FileInputStream.java:93)
> E at 
> org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216)
> E at 
> org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201)
> E at 
> org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137)
> E at 
> org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119)
> E ... 24 more{code}
>  When running a simple data stream loader for kafka without an SSL cert, it 
> goes through this code block - 
>  
> {code:java}
> ...
> ...
> org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
> E at 
> org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
> E at 
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
> ...
> ...{code}
>  
> Note that I haven't selected `trigger=continuous...` when creating the 
> dataframe, still the code is going through the continuous path. My 
> understanding was that `continuous` is optional and not the default.
>  
> Please clarify.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to