Limit the outbound queue in the client library

I have a client using the Mosquitto client library that emits data faster than the target broken can ingest it. The result is that the outbound queue in the client is growing until all available memory is exhausted. How can I limit the number of publish messages that exist in the client’s outbound queue?

For example:

  • Is there an efficient way to get a count of the number of queued messages?
  • Is there an efficient way to get an aggregate payload length of all queued messages?
  • Is there a mechanism that automatically rejects new publish messages when the queue reaches a certain length?
  • Is there a way to prune the queue, or an interface that would allow me to write my own pruning function (e.g., traverse the queue and remove old messages destined for the same topic)?

Is there an existing mechanism that I am unaware of that would help here?

Hello,

At the moment none the options you have asked about are available. Keeping your own list of messages ready to be published may be workable - especially if you have messages for the same topic and the oldest can be discarded. You could keep track of which messages were yet to be sent to the broker by using the mid parameter from the mosquitto_publish() call, and correlating that with the same parameter in the on_publish callback. This isn’t a very satisfactory solution though, especially if the client loses its connection because you may get out of sync.

Other thoughts around this - maybe check if you’re using QoS 2 whether you really need to do that, or whether the target broker can be configured to have a bigger inflight message queue which may help with throughput.

And finally, yes there should be some mechanism to deal with this situation in the library.

Regards,

Roger

Thanks for the reply. I’m using QoS 0 in this case - I’m just overloading the broker.

One other potential mitigation would be to expose the state of the socket to the client code. That is, give the client a way to know that the socket buffer is full so it can avoid even enqueueing a message. That would allow my client code to implement any message deletion algorithm I like without having to prune the queue within the client library.

You can get the socket using mosquitto_socket().

Querying the socket for its writable status is at a minimum a select() call, which should only be made if there is reason to suspect the socket buffer is full, so querying on every write would be potentially slow at high volumes. Instead, I looked for a minor edit in the library that would do the job.

I added a “socket_full” boolean flag to struct mosquitto, and modified packet__write:

  • if net__write produces EAGAIN, set socket_full to true
  • if net__write results in an incomplete write, set socket_full to true
  • before calling net__write, check if socket_full is true. If true, return MOSQ_ERR_SUCCESS

I modified “mosquitto_loop” to set socket_full to false when FD_ISSET(mosq->sock, &writefds)

The result::

  • it allows me to test socket_full at zero cost
  • it reduces system calls, since it will no longer attempt net_write() when the socket is known to be full
  • In my test case, it resulted in a 10-15% speed improvement

Incidentally, while testing I noticed that there is a call to send() of a single zero byte in packet__queue that is documented to break out of a select() when running in threaded mode. This call consumes more time than all other processing put together in my test case. I changed the condition around that call to only execute if mosq->threaded != mosq_ts_none. That reduced the processing time for my test case by 75%.

Thank you, that’s an interesting idea. mosquitto_want_write() may have been sufficient for you, although probably wouldn’t provide the same performance.

The socketpair code can be optionally disabled from 2.1 onwards, but I agree that it doesn’t make sense to have it in a non-threaded mode.