"Outgoing publish message was dropped. Receiving consumer: ... "

 Problem

When event.log shows the error message like:

Outgoing publish message was dropped. Receiving consumer: nvqxa4djnzts2mbrpqvs6tlzirqxiyjpem======, topic: 1234-dev/MyData//1.234/STATION123E/NP/9876/ABCD/1146/12/3H, qos: 0, reason: The QoS 0 memory limit exceeded, size: 52,429,963 bytes, max: 52,428,800 bytes.

The message reason: The QoS 0 memory limit exceeded indicates that QoS messages are being consumed more slowly than produced for some time, the QoS 0 memory limit is exceeded per consumer, hence the QoS 0 message was dropped, as for QoS 0 messages there is no delivery guarantee.

Note that the message contains no Client ID but the Receiving consumer's ID. This indicates that the slow recipient of the message is not an MQTT Client but a HiveMQ Enterprise Extension Consumer.

The event log message is being logged in the following way:

logMessageDropped.debug(OUTBOUND_PUBLISH_DROPPED_CONSUMER_MARKER, "Outgoing publish message was dropped. Receiving consumer: {}, topic: {}, qos: {}, reason: {}.", valueFromNullable(CONSUMER_ID_KEY, consumerId), valueFromNullable(TOPIC_KEY, topic), value(QOS_KEY, qos), value(REASON_KEY, reason));

So the consumer id is after the words Receiving consumer: {} and before the comma ,.

Each Extension uses its own way to generate a CONSUMER_ID_KEY.

This article will explain how to find out, to which HiveMQ Enterprise Extension the consumer belongs and how to identify the corresponding configuration item in the extension’s configuration files.

 Extension for Google Pub/Sub

  1. Look at the configuration XML of MQTT to Google Cloud Pub/Sub Extension. The mapping looks the following way:

    <mqtt-to-pubsub-mappings> <mqtt-to-pubsub-mapping> <id>mapping-01</id> <!-- REQUIRED --> <pubsub-connection>your-custom-connection-id</pubsub-connection> <!-- REQUIRED --> <preserve-message-properties>true</preserve-message-properties> <!-- OPTIONAL, Default = false --> <mqtt-topic-filters> <!-- REQUIRED --> <mqtt-topic-filter>+/MyData/#</mqtt-topic-filter> <!-- REQUIRED, at least one --> <mqtt-topic-filter>+/YourData/#</mqtt-topic-filter> <!-- OPTIONAL --> <mqtt-topic-filter>+/OurData/#</mqtt-topic-filter> <!-- OPTIONAL --> </mqtt-topic-filters> ... ... </mqtt-to-pubsub-mapping> </mqtt-to-pubsub-mappings>

    Where a mapping entity <id>mapping-01</id> contains multiple topic filters <mqtt-topic-filter>topic/a</mqtt-topic-filter>.

  2. Each unique Consumer Id will consist of a Mapping Entity Id and Topic Filter:

    final String consumerId = entityId + "|" + topicFilter;
  3. The consumer id will be encoded to avoid unsupported characters. The encoding method is Base32 encoding which is all in lowercase letters and without any padding characters:

    Encoding the ConsumerId:

  4. Decode ConsumerId nvqxa4djnzts2mbrpqvs6tlzirqxiyjpem====== into a human-readable form:

    Output mapping-01|+/MyData/# means that the consumer corresponds to the Pub/Sub configuration file mapping with id mapping-01 and topic filter +/MyData/#.

  5. Find the corresponding entity in the Google Cloud Pub/Sub configuration file:

If a Pub/Sub consumer frequently experiences this kind of dropped message: “The QoS 0 memory limit exceeded“, it can indicate that the Pub/Sub is consuming messages too slowly or has unstable network connections.

Bridge Extension

  1. Look at the configuration XML of Bridge Extension.

     

  2. Consumer Id will be the Bridge name, i.e. my-bridge-1.

Amazon Kinesis Extension

Amazon Kinesis Extension is using the same approach as Google Pub/Sub Extension:

  1. Look at the extension configuration:

     

  2. ConsumerId will be base32 encoded lowercase string my-mqtt-to-kinesis-route|mqtt/topic/anv4s23lror2c25dpfvvws3tfonuxgllsn52xizl4nvyxi5bporxxa2ldf5qq====.

Kafka Extension

In the Kafka Extension, a Consumer Id consists of kafka-extension_ prefix concatenated to a mapping id:

Look at the kafka-cofiguration.xml:

The Consumer id will look like kafka-extension_mapping01.