Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

\uD83E\uDD14 Problem

When event.log shows the error message like:

...

The message reason: The QoS 0 memory limit exceeded indicates that QoS messages are being consumed more slowly than produced for some time, the QoS 0 memory limit (by default 500,000) is exceeded per consumer, hence the QoS 0 message was dropped, as for QoS 0 messages there is no delivery guarantee.

...

This article will explain how to find out, to which HiveMQ Enterprise Extension the consumer belongs and how to identify the corresponding configuration item in the extension’s configuration files.

\uD83C\uDF31 Extension for Google Pub/Sub

  1. Look at the configuration XML of MQTT to Google Cloud Pub/Sub Extension. The mapping looks the following way:

    Code Block
    languagexml
    <mqtt-to-pubsub-mappings>
        <mqtt-to-pubsub-mapping>
            <id>mapping-01</id> <!-- REQUIRED -->
            <pubsub-connection>your-custom-connection-id</pubsub-connection> <!-- REQUIRED -->
            <preserve-message-properties>true</preserve-message-properties> <!-- OPTIONAL, Default = false -->
            <mqtt-topic-filters> <!-- REQUIRED -->
                <mqtt-topic-filter>+/MyData/#</mqtt-topic-filter> <!-- REQUIRED, at least one -->
                <mqtt-topic-filter>+/YourData/#</mqtt-topic-filter> <!-- OPTIONAL -->
                <mqtt-topic-filter>+/OurData/#</mqtt-topic-filter> <!-- OPTIONAL -->
            </mqtt-topic-filters>
            ...
            ...
        </mqtt-to-pubsub-mapping>
    </mqtt-to-pubsub-mappings>

    Where a mapping entity <id>mapping-01</id> contains multiple topic filters <mqtt-topic-filter>topic/a</mqtt-topic-filter>.

  2. Each unique Consumer Id will consist of a Mapping Entity Id and Topic Filter:

    Code Block
    languagejava
    final String consumerId = entityId + "|" + topicFilter;
  3. The consumer id will be encoded to avoid unsupported characters. The encoding method is Base32 encoding which is all in lowercase letters and without any padding characters:

    Code Block
    languagejava
    BaseEncoding CONSUMER_ID_ENCODING = BaseEncoding.base32().lowerCase().omitPadding();

    Encoding the ConsumerId:

    Code Block
    languagejava
    return CONSUMER_ID_ENCODING.encode(consumerId.getBytes(StandardCharsets.UTF_8));
  4. Decode ConsumerId nvqxa4djnzts2mbrpqvs6tlzirqxiyjpem====== into a human-readable form:

    Code Block
    languagepy
    import base64
    
    encoded_string = "nvqxa4djnzts2mbrpqvs6tlzirqxiyjpem======"
    decoded_bytes = base64.b32decode(encoded_string.encode(), casefold=True)
    decoded_string = decoded_bytes.decode()
    
    print(decoded_string) # Output: mapping-01|+/MyData/#

    Output mapping-01|+/MyData/# means that the consumer corresponds to the Pub/Sub configuration file mapping with id mapping-01 and topic filter +/MyData/#.

  5. Find the corresponding entity in the Google Cloud Pub/Sub configuration file:

Info

If a Pub/Sub consumer frequently experiences this kind of dropped message: “The QoS 0 memory limit exceeded“, it can indicate that the Pub/Sub is consuming messages too slowly or has unstable network connections.

\uD83C\

...

uDF31 Bridge Extension

  1. Look at the configuration XML of Bridge Extension.

    Code Block
    languagexml
    <bridges>
        <bridge>
            <enabled>true</enabled>
            <name>my-bridge-1</name>
            ...
        </bridge>
        <bridge>
            <enabled>true</enabled>
            <name>my-bridge-2</name>
            ...
        </bridge>
    </bridges>

  2. Consumer Id will be the Bridge name, i.e. my-bridge-1.

\uD83C\

...

uDF31 Amazon Kinesis Extension

Amazon Kinesis Extension is using the same approach as Google Pub/Sub Extension:

...

  1. Look at the extension configuration:

    Code Block
    languagexml
    <mqtt-to-kinesis-routes>
      <mqtt-to-kinesis-route>
          <id>my-mqtt-to-kinesis-route</id>
          <enabled>true</enabled>
          <aws-credential-profile-id>aws-credential-profile-01</aws-credential-profile-id>
          <region>eu-central-1</region>
          <mqtt-topic-filters>
              <mqtt-topic-filter>mqtt/topic/a</mqtt-topic-filter>
          </mqtt-topic-filters>
          ...

  2. ConsumerId will be base32 encoded lowercase string my-mqtt-to-kinesis-route|mqtt/topic/anv4s23lror2c25dpfvvws3tfonuxgllsn52xizl4nvyxi5bporxxa2ldf5qq====.

Kafka Extension

In the Kafka Extension, a Consumer Id consists of kafka-extension_ prefix concatenated to a mapping id:

Code Block
languagejava
String CONSUMER_PREFIX = "kafka-extension_";
...
final String consumerId = CONSUMER_PREFIX + mqttToKafkaMapping.getId();

Look at the kafka-cofiguration.xml:

Code Block
languagexml
    <mqtt-to-kafka-mappings>
        <mqtt-to-kafka-mapping>
            <id>mapping01</id>
            <cluster-id>cluster01</cluster-id>
            <mqtt-topic-filters>
                <mqtt-topic-filter>#</mqtt-topic-filter>
            </mqtt-topic-filters>
            <kafka-topic>kafka-topic</kafka-topic>
        </mqtt-to-kafka-mapping>
    </mqtt-to-kafka-mappings>

The Consumer id will look like kafka-extension_mapping01.

Filter by label (Content by label)
showLabelsfalse
max5
spacescom.atlassian.confluence.content.render.xhtml.model.resource.identifiers.SpaceResourceIdentifier@957
sortmodified
showSpacefalse
reversetrue
typepage
cqllabel = "kb-troubleshooting-articleerror" and type = "page" and space = "KB"
labelskb-troubleshooting-article