[ https://issues.apache.org/jira/browse/APEXCORE-810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514868#comment-16514868 ]
ASF GitHub Bot commented on APEXCORE-810: ----------------------------------------- vrozov closed pull request #595: APEXCORE-810 Fixing race condition between publisher and subscriber teardowns URL: https://github.com/apache/apex-core/pull/595 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index 3e8846d1e9..af5db09b6d 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -115,12 +115,7 @@ public void addConnection(WriteOnlyClient connection) */ public void removeChannel(WriteOnlyClient client) { - for (PhysicalNode pn : physicalNodes) { - if (pn.getClient() == client) { - physicalNodes.remove(pn); - break; - } - } + physicalNodes.removeIf(node -> (node.getClient().equals(client))); } /** diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index c5700f2690..6332a18804 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -24,9 +24,6 @@ import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Map.Entry; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -117,9 +114,11 @@ public void registered(SelectionKey key) @Override public void unregistered(SelectionKey key) { + logger.debug("Unregistered {}", this); for (LogicalNode ln : subscriberGroups.values()) { ln.boot(); } + super.unregistered(key); /* * There may be un-register tasks scheduled to run on the event loop that use serverHelperExecutor. */ @@ -860,41 +859,32 @@ private void teardown() } torndown = true; - /* - * if the publisher unregistered, all the downstream guys are going to be unregistered anyways - * in our world. So it makes sense to kick them out proactively. Otherwise these clients since - * are not being written to, just stick around till the next publisher shows up and eat into - * the data it's publishing for the new subscribers. - */ - - /** - * since the publisher server died, the queue which it was using would stop pumping the data unless - * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node - * with the same identifier as the one which just died. - */ - if (publisherChannels.containsValue(this)) { - final Iterator<Entry<String, AbstractLengthPrependerClient>> i = publisherChannels.entrySet().iterator(); - while (i.hasNext()) { - if (i.next().getValue() == this) { - i.remove(); - break; - } - } - } - - ArrayList<LogicalNode> list = new ArrayList<>(); - String publisherIdentifier = datalist.getIdentifier(); - Iterator<LogicalNode> iterator = subscriberGroups.values().iterator(); - while (iterator.hasNext()) { - LogicalNode ln = iterator.next(); - if (publisherIdentifier.equals(ln.getUpstream())) { - list.add(ln); + serverHelperExecutor.submit(() -> + { + /* + * if the publisher unregistered, all the downstream guys are going to be unregistered anyways + * in our world. So it makes sense to kick them out proactively. Otherwise these clients since + * are not being written to, just stick around till the next publisher shows up and eat into + * the data it's publishing for the new subscribers. + */ + + /** + * since the publisher server died, the queue which it was using would stop pumping the data unless + * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node + * with the same identifier as the one which just died. + */ + String publisherIdentifier = datalist.getIdentifier(); + if (!publisherChannels.remove(publisherIdentifier, Publisher.this)) { + logger.warn("{} could not be removed from channels", Publisher.this); } - } - for (LogicalNode ln : list) { - ln.boot(); - } + subscriberGroups.forEach((type, ln) -> { + if (publisherIdentifier.equals(ln.getUpstream())) { + logger.debug("Booting logical node {} from publisher", ln); + ln.boot(); + } + }); + }); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Concurrent modification exception during connection cleanup in buffer server > ---------------------------------------------------------------------------- > > Key: APEXCORE-810 > URL: https://issues.apache.org/jira/browse/APEXCORE-810 > Project: Apache Apex Core > Issue Type: Bug > Reporter: Pramod Immaneni > Assignee: Pramod Immaneni > Priority: Minor > > {noformat} > ERROR com.datatorrent.bufferserver.server.Server: Buffer server > Server@56cfec7c\{address=/0:0:0:0:0:0:0:0:45081} failed to tear down > subscriber > Subscriber@2ff22212{ln=LogicalNode@36d87f9eidentifier=tcp://xxxxxx:45081/2.output.1, > upstream=2.output.1, group=rand_console/3.input, partitions=[], > iterator=com.datatorrent.bufferserver.internal.DataList$DataListIterator@6caeed6a{da=com.datatorrent.bufferserver.internal.DataList$Block@506501e4{identifier=2.output.1, > data=16777216, readingOffset=0, writingOffset=1822, > starting_window=59dc9c3000000001, ending_window=59dc9c3000000055, refCount=2, > uniqueIdentifier=0, next=null, > future=null}}}}.java.util.ConcurrentModificationException > at java.util.HashMap$HashIterator.nextEntry(HashMap.java:922) > at java.util.HashMap$KeyIterator.next(HashMap.java:956) > at > com.datatorrent.bufferserver.internal.LogicalNode.removeChannel(LogicalNode.java:118) > at com.datatorrent.bufferserver.server.Server$3.run(Server.java:410) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)