[GitHub] storm pull request #2909: STORM-3123 - add support for Kafka security config...
GitHub user arunmahadevan opened a pull request: https://github.com/apache/storm/pull/2909 STORM-3123 - add support for Kafka security config in storm-kafka-monitor 1.x version of https://github.com/apache/storm/pull/2906 You can merge this pull request into a Git repository by running: $ git pull https://github.com/arunmahadevan/storm STORM-3123-1.x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2909.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2909 commit 371cc269a6dec5ee6172d7631f7f5574d6eab566 Author: Vipin Rathor Date: 2018-07-12T00:01:36Z STORM-3123 - add support for Kafka security config in storm-kafka-monitor commit bbe7827987dc03393652740522975b7bf0169c64 Author: Arun Mahadevan Date: 2018-11-12T19:19:31Z STORM-3123: Changes to return extra properties from KafkaSpout and use it in TopologySpoutLag commit 3e655cba0a621aae24a3300c2b75d1b15300032b Author: Arun Mahadevan Date: 2018-11-16T01:12:44Z STORM-3123: Address review comments ---
[GitHub] storm pull request #2906: STORM-3123 - add support for Kafka security config...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2906 ---
[GitHub] storm pull request #2908: STORM-3276: Updated Flux to deal with storm local ...
Github user kishorvpatil commented on a diff in the pull request: https://github.com/apache/storm/pull/2908#discussion_r234019467 --- Diff: storm-server/src/main/java/org/apache/storm/LocalCluster.java --- @@ -1197,8 +1238,9 @@ public IBolt makeAckerBoltImpl() { * When running a topology locally, for tests etc. It is helpful to be sure that the topology is dead before the test exits. This is * an AutoCloseable topology that not only gives you access to the compiled StormTopology but also will kill the topology when it * closes. - * + * ``` --- End diff -- Minor one. Not sure if ``` turns it into pre formatted code. Do we need to use ` tag? ---
[GitHub] storm pull request #2908: STORM-3276: Updated Flux to deal with storm local ...
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/2908 STORM-3276: Updated Flux to deal with storm local correctly You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-3276 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2908.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2908 commit 150160f534145d568c0368ec19823813ad16136c Author: Robert (Bobby) Evans Date: 2018-11-15T20:12:48Z STORM-3276: Updated Flux to deal with storm local correctly ---
[GitHub] storm pull request #2906: STORM-3123 - add support for Kafka security config...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2906#discussion_r233963942 --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java --- @@ -27,12 +27,15 @@ private final String consumerGroupId; // consumer group id for which the offset needs to be calculated private final String bootStrapBrokers; // bootstrap brokers private final String securityProtocol; // security protocol to connect to kafka +private final String consumerConfig; // security configuration file to connect to secure kafka --- End diff -- it can be any properties. renamed to `consumerPropertiesFileName`. ---
[GitHub] storm issue #2906: STORM-3123 - add support for Kafka security config in sto...
Github user arunmahadevan commented on the issue: https://github.com/apache/storm/pull/2906 @HeartSaVioR thanks for reviewing. Addressed comments. ---
[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2907#discussion_r233750714 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java --- @@ -218,26 +228,27 @@ private void emitTuple(TridentCollector collector, ConsumerRecord record) * * This is the first batch for this partition * This is a replay of the first batch for this partition - * This is batch n for this partition, where batch 0...n-1 were all empty * * * @return the offset of the next fetch */ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) { -if (isFirstPoll(tp)) { -if (firstPollOffsetStrategy == EARLIEST) { +if (isFirstPollSinceExecutorStarted(tp)) { +boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == null +|| !topologyContext.getStormId().equals(lastBatchMeta.getTopologyId()); +if (firstPollOffsetStrategy == EARLIEST && isFirstPollSinceTopologyWasDeployed) { LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp); consumer.seekToBeginning(Collections.singleton(tp)); -} else if (firstPollOffsetStrategy == LATEST) { +} else if (firstPollOffsetStrategy == LATEST && isFirstPollSinceTopologyWasDeployed) { LOG.debug("First poll for topic partition [{}], seeking to partition end", tp); consumer.seekToEnd(Collections.singleton(tp)); } else if (lastBatchMeta != null) { LOG.debug("First poll for topic partition [{}], using last batch metadata", tp); consumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch -} else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) { +} else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST || firstPollOffsetStrategy == EARLIEST) { --- End diff -- No, doesn't look like it. Good catch. ---