`
ywu
  • 浏览: 452725 次
  • 性别: Icon_minigender_1
  • 来自: 无锡
社区版块
存档分类
最新评论

ActiveMQ Failover broker url顺序

阅读更多

ActiveMQ的客户端支持失效重连。昨天在配置ActiveMQ的主从结构后,客户端使用失效重连协议连接到代理,如下所示:

failover:(tcp://master:61616,tcp://slave:61616)

 测试的时候客户端有时候连接到master上,有时候连接到slave上。这就尴尬了,正常理解的话,配在前面的broker url应该先尝试连接,连接不上才连接后面的url,即从前到后的顺序,但是测试结果来看,是不一定的。

使用logback打开了ActiveMQ的日志

<logger name="org.apache.activemq" level="debug" additivity="false">
    <appender-ref ref="STDOUT"/>
</logger>

 日志中输出的确有的时候只直接连了slave,而不是每次都先连master,而且master是正常的,那就说明的确客户端是做过处理了,并不是按从前到后的顺序依次连接。

日志中会输出“Successfully connected to slave...”,logback配置中输出了代码行数,因此很快定位到程序,org.apache.activemq.transport.failover.FailoverTransport类的876行(客户端使用的是activemq-all-5.4.3.jar),代码如下

final boolean doReconnect() {
        Exception failure = null;
        synchronized (reconnectMutex) {

            // If updateURIsURL is specified, read the file and add any new
            // transport URI's to this FailOverTransport. 
            // Note: Could track file timestamp to avoid unnecessary reading.
            String fileURL = getUpdateURIsURL();
            if (fileURL != null) {
                BufferedReader in = null;
                String newUris = null;
                StringBuffer buffer = new StringBuffer();

                try {
                    in = new BufferedReader(getURLStream(fileURL));
                    while (true) {
                        String line = in.readLine();
                        if (line == null) {
                            break;
                        }
                        buffer.append(line);
                    }
                    newUris = buffer.toString();
                } catch (IOException ioe) {
                    LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
                } finally {
                    if (in != null) {
                        try {
                            in.close();
                        } catch (IOException ioe) {
                            // ignore
                        }
                    }
                }
                
                processNewTransports(isRebalanceUpdateURIs(), newUris);
            }

            if (disposed || connectionFailure != null) {
                reconnectMutex.notifyAll();
            }

            if (connectedTransport.get() != null || disposed || connectionFailure != null) {
                return false;
            } else {
                List<URI> connectList = getConnectList();
                if (connectList.isEmpty()) {
                    failure = new IOException("No uris available to connect to.");
                } else {
                    if (!useExponentialBackOff || reconnectDelay == -1) {
                        reconnectDelay = initialReconnectDelay;
                    }
                    synchronized (backupMutex) {
                        if (backup && !backups.isEmpty()) {
                            BackupTransport bt = backups.remove(0);
                            Transport t = bt.getTransport();
                            URI uri = bt.getUri();
                            t.setTransportListener(myTransportListener);
                            try {
                                if (started) {
                                    restoreTransport(t);
                                }
                                reconnectDelay = initialReconnectDelay;
                                failedConnectTransportURI = null;
                                connectedTransportURI = uri;
                                connectedTransport.set(t);
                                reconnectMutex.notifyAll();
                                connectFailures = 0;
                                LOG.info("Successfully reconnected to backup " + uri);
                                return false;
                            } catch (Exception e) {
                                LOG.debug("Backup transport failed", e);
                            }
                        }
                    }

                    Iterator<URI> iter = connectList.iterator();
                    while (iter.hasNext() && connectedTransport.get() == null && !disposed) {
                        URI uri = iter.next();
                        Transport t = null;
                        try {
                            LOG.debug("Attempting connect to: " + uri);
                            SslContext.setCurrentSslContext(brokerSslContext);
                            t = TransportFactory.compositeConnect(uri);
                            t.setTransportListener(myTransportListener);
                            t.start();

                            if (started) {
                                restoreTransport(t);
                            }

                            LOG.debug("Connection established");
                            reconnectDelay = initialReconnectDelay;
                            connectedTransportURI = uri;
                            connectedTransport.set(t);
                            reconnectMutex.notifyAll();
                            connectFailures = 0;
                            // Make sure on initial startup, that the
                            // transportListener
                            // has been initialized for this instance.
                            synchronized (listenerMutex) {
                                if (transportListener == null) {
                                    try {
                                        // if it isn't set after 2secs - it
                                        // probably never will be
                                        listenerMutex.wait(2000);
                                    } catch (InterruptedException ex) {
                                    }
                                }
                            }
                            if (transportListener != null) {
                                transportListener.transportResumed();
                            } else {
                                LOG.debug("transport resumed by transport listener not set");
                            }
                            if (firstConnection) {
                                firstConnection = false;
                                LOG.info("Successfully connected to " + uri);
                            } else {
                                LOG.info("Successfully reconnected to " + uri);
                            }
                            connected = true;
                            return false;
                        } catch (Exception e) {
                            failure = e;
                            LOG.debug("Connect fail to: " + uri + ", reason: " + e);
                            if (t != null) {
                                try {
                                    t.stop();
                                } catch (Exception ee) {
                                    LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
                                }
                            }
                        } finally {
                            SslContext.setCurrentSslContext(null);
                        }
                    }
                }
            }
            int reconnectAttempts = 0;
            if (firstConnection) {
                if (this.startupMaxReconnectAttempts != 0) {
                    reconnectAttempts = this.startupMaxReconnectAttempts;
                }
            }
            if (reconnectAttempts == 0) {
                reconnectAttempts = this.maxReconnectAttempts;
            }
            if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) {
                LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
                connectionFailure = failure;

                // Make sure on initial startup, that the transportListener has
                // been initialized
                // for this instance.
                synchronized (listenerMutex) {
                    if (transportListener == null) {
                        try {
                            listenerMutex.wait(2000);
                        } catch (InterruptedException ex) {
                        }
                    }
                }

                if (transportListener != null) {
                    if (connectionFailure instanceof IOException) {
                        transportListener.onException((IOException) connectionFailure);
                    } else {
                        transportListener.onException(IOExceptionSupport.create(connectionFailure));
                    }
                }
                reconnectMutex.notifyAll();
                return false;
            }
        }
        if (!disposed) {

            LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
            synchronized (sleepMutex) {
                try {
                    sleepMutex.wait(reconnectDelay);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }

            if (useExponentialBackOff) {
                // Exponential increment of reconnect delay.
                reconnectDelay *= backOffMultiplier;
                if (reconnectDelay > maxReconnectDelay) {
                    reconnectDelay = maxReconnectDelay;
                }
            }
        }
        return !disposed;
    }

 代码List<URI> connectList = getConnectList();获取配置的broker url,打了断点后debug发现,这边获取的List中的URI有时候已经不是配置中的顺序了,进入getConnectList方法

private List<URI> getConnectList() {
        ArrayList<URI> l = new ArrayList<URI>(uris);
        boolean removed = false;
        if (failedConnectTransportURI != null) {
            removed = l.remove(failedConnectTransportURI);
        }
        if (randomize) {
            // Randomly, reorder the list by random swapping
            for (int i = 0; i < l.size(); i++) {
                int p = (int) (Math.random() * 100 % l.size());
                URI t = l.get(p);
                l.set(p, l.get(i));
                l.set(i, t);
            }
        }
        if (removed) {
            l.add(failedConnectTransportURI);
        }
        LOG.debug("urlList connectionList:" + l);
        return l;
    }

 这边有个randomize,默认值为true,即默认这边会对配置的url重新排序,所以导致连接初始化是可能不是按照想象的从前到后的顺序连接,解决也简单,在连接后价格参数,将randomize设为false

failover:(tcp://master:61616,tcp://slave:61616)?randomize=false

 这样每次都会先连接master,master连不上才会连slave。

 

 

1
1
分享到:
评论
2 楼 ywu 2016-08-17  
Tyrion 写道
activemq的官方文档给出了这个配置的说明:
http://activemq.apache.org/failover-transport-reference.html

The Failover transport chooses a URI at random by default. This effectively load-balances clients over multiple brokers. However, to have a client connect to a primary first and only connect to a secondary backup broker when the primary is unavailable, set randomize=false.

不过给你这种翻源码的精神点赞一下。

谢谢 文档读过又忘了,还是代码来的暴力
1 楼 Tyrion 2016-08-17  
activemq的官方文档给出了这个配置的说明:
http://activemq.apache.org/failover-transport-reference.html

The Failover transport chooses a URI at random by default. This effectively load-balances clients over multiple brokers. However, to have a client connect to a primary first and only connect to a secondary backup broker when the primary is unavailable, set randomize=false.

不过给你这种翻源码的精神点赞一下。

相关推荐

Global site tag (gtag.js) - Google Analytics