Just a second...

Specifying a reconnection strategy

Reconnection behavior can be configured using custom reconnection strategies.

The reconnection behavior of a client session can be configured using reconnection strategies. A reconnection strategy is applied when the session enters the RECOVERING_RECONNECT state, enabling the session to attempt to reconnect and recover its previous state.

Reconnection can only succeed if the client session is still available on Diffusion™ Cloud . The maximum time that Diffusion Cloud keeps client sessions available for reconnection is 60 seconds.

Individual client sessions can request a shorter reconnection timeout for their sessions or request to disable reconnection when they first connect to Diffusion Cloud

Examples

JavaScript
// When establishing a session, it is possible to specify whether reconnection
// should be attempted in the event of an unexpected disconnection. This allows
// the session to recover its previous state.

// Set the maximum amount of time we'll try and reconnect for to 10 minutes
const maximumTimeoutDuration = 1000 * 60 * 10;

// Set the maximum interval between reconnect attempts to 60 seconds
const maximumAttemptInterval = 1000 * 60;

// Set an upper limit to the number of times we'll try to reconnect for
const maximumAttempts = 25;

// Count the number of reconnection attempts we've made
let attempts = 0;

// Create a reconnection strategy that applies an exponential back-off
// The strategy will be called with two arguments, start & abort. Both
// of these are functions, which allow the strategy to either start a
// reconnection attempt, or to abort reconnection (which will close the session)
const reconnectionStrategy = (start, abort) => {
    if (attempts > maximumAttempts) {
        abort();
    } else {
        const wait = Math.min(Math.pow(2, attempts++) * 100, maximumAttemptInterval);

        // Wait the specified time period, and then start the reconnection attempt
        setTimeout(start, wait);
    }
};

// Connect to the server.
const session = await diffusion.connect({
    host : 'diffusion.example.com',
    port : 443,
    secure : true,
    principal : 'control',
    credentials : 'password',
    reconnect : {
        timeout : maximumTimeoutDuration,
        strategy : reconnectionStrategy
    }
});

session.on('disconnect', () => {
    // This will be called when we lose connection. Because we've specified the
    // reconnection strategy, it will be called automatically when this event
    // is dispatched
});

session.on('reconnect', () => {
    // If the session is able to reconnect within the reconnect timeout, this
    // event will be dispatched to notify that normal operations may resume
    attempts = 0;
});

session.on('close', () => {
    // If the session is closed normally, or the session is unable to reconnect,
    // this event will be dispatched to notify that the session is no longer
    // operational.
});
.NET
/// <summary>
/// Implementation that demonstrates session reconnection strategy.
/// </summary>
public sealed class SessionReconnection
{
    private int maximumTimeoutDuration = 1000 * 60 * 10;

    private ReconnectionStrategy reconnectionStrategy = new ReconnectionStrategy();

    public SessionReconnection(string serverUrl)
    {
        var factory = Diffusion.Sessions;

        var session = Connect(serverUrl, factory);

        if (session != null)
        {
            WriteLine("The session has been created.");
        }

        Thread.Sleep(60000);

        session.Close();
    }

    public ISession Connect(string url, ISessionFactory initialFactory)
    {
        try
        {
            string principal = "control";
            string password = "password";

            var factory = initialFactory
                .Principal(principal)
                .Credentials(Diffusion.Credentials.Password(password))
                .CertificateValidation((cert, chain, errors)
                    => CertificateValidationResult.ACCEPT)
                .ReconnectionTimeout(maximumTimeoutDuration)
                .ReconnectionStrategy(reconnectionStrategy)
                .SessionStateChangedHandler(OnSessionStateChanged);

            return factory.Open(url);
        }
        catch (Exception ex)
        {
            WriteLine($"Session connection error : {ex}.");
        }

        return null;
    }

    private void OnSessionStateChanged(object sender, SessionListenerEventArgs e)
    {
        if (e.NewState == SessionState.RECOVERING_RECONNECT)
        {
            // The session has been disconnected, and has entered
            // recovery state. It is during this state that
            // the reconnect strategy will be called
            WriteLine("The session has been disconnected.");
        }

        if (e.NewState == SessionState.CONNECTED_ACTIVE)
        {
            // The session has connected for the first time, or it has
            // been reconnected.
            reconnectionStrategy.Retries = 0;

            WriteLine("The session has connected.");
        }

        if (e.OldState == SessionState.RECOVERING_RECONNECT)
        {
            // The session has left recovery state. It may either be
            // attempting to reconnect, or the attempt has been aborted;
            // this will be reflected in the newState.
        }

        if (e.NewState == SessionState.CLOSED_BY_CLIENT)
        {
            WriteLine("The session has been closed.");
        }
    }

    /// <summary>
    /// A reconnection strategy that gets applied after the connection failure notification.
    /// </summary>
    private class ReconnectionStrategy : IReconnectionStrategy
    {
        public int Retries { get; set; }

        // Set the maximum interval between reconnect attempts to 60 seconds.
        private long maximumAttemptInterval = 1000 * 60;

        public ReconnectionStrategy() => Retries = 0;

        public async Task PerformReconnection(IReconnectionAttempt reconnection)
        {
            long wait = Math.Min((long)Math.Pow(2, Retries++) * 100L, maximumAttemptInterval);

            Thread.Sleep((int)wait);

            WriteLine("Attempting to reconnect...");

            reconnection.Start();
        }
    }
}
Java and Android
final ReconnectionStrategy reconnectionStrategy = new ReconnectionStrategy() {

    private int retries = 0;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    @Override
    public void performReconnection(ReconnectionAttempt reconnectionAttempt) {

        if (retries < 5) {
            int delay = (int) Math.pow(2, retries); // Exponential backoff
            retries++;
            scheduler.schedule(reconnectionAttempt::start, delay, TimeUnit.SECONDS);
        }
        else {
            reconnectionAttempt.abort();
        }
    }
};

final Session session = Diffusion.sessions()
    .principal("admin")
    .password("password")
    .reconnectionStrategy(reconnectionStrategy)
    .open("ws://localhost:8080");
C
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include <stdio.h>
#include <stdlib.h>

#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include <time.h>

#include "diffusion.h"

/*
 * This callback is used when the session state changes, e.g. when a session
 * moves from a "connecting" to a "connected" state, or from "connected" to
 * "closed".
 */
static void on_session_state_changed(SESSION_T *session,
        const SESSION_STATE_T old_state,
        const SESSION_STATE_T new_state)
{
        printf("Session state changed from %s (%d) to %s (%d)\n",
               session_state_as_string(old_state), old_state,
               session_state_as_string(new_state), new_state);
}


typedef struct {
        double current_wait;
        double max_wait;
} BACKOFF_STRATEGY_ARGS_T;


static RECONNECTION_ATTEMPT_ACTION_T backoff_reconnection_strategy(
        SESSION_T *session,
        void *args)
{
        BACKOFF_STRATEGY_ARGS_T *backoff_args = args;

        printf("Waiting for %f ms\n", backoff_args->current_wait);

        sleep(backoff_args->current_wait);

        // But only up to some maximum time.
        if(backoff_args->current_wait > backoff_args->max_wait) {
                backoff_args->current_wait = backoff_args->max_wait;
        }

        return RECONNECTION_ATTEMPT_ACTION_START;
}


static void backoff_success(SESSION_T *session, void *args)
{
        printf("Reconnection successful\n");

        BACKOFF_STRATEGY_ARGS_T *backoff_args = args;
        backoff_args->current_wait = 0; // Reset wait.
}


static void backoff_failure(SESSION_T *session, void *args)
{
        printf("Reconnection failed (%s)\n", session_state_as_string(session->state));

        BACKOFF_STRATEGY_ARGS_T *backoff_args = args;

        // Exponential backoff.
        if(backoff_args->current_wait == 0) {
                backoff_args->current_wait = 0.01;
        }
        else {
                backoff_args->current_wait *= 2;
        }
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";

        CREDENTIALS_T *credentials = credentials_create_password(password);

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };
        SESSION_LISTENER_T session_listener = {
                .on_state_changed = on_session_state_changed
        };

        // Set the arguments to our exponential backoff strategy
        BACKOFF_STRATEGY_ARGS_T backoff_args = {
                .current_wait = 0,
                .max_wait = 5000
        };

        // Create the backoff strategy
        RECONNECTION_STRATEGY_T *reconnection_strategy =
                make_reconnection_strategy_user_function(
                        backoff_reconnection_strategy,
                        &backoff_args,
                        backoff_success,
                        backoff_failure);

        // Only retry for 30 seconds
        reconnection_strategy_set_timeout(reconnection_strategy, 30 * 1000);

        // Create a session, synchronously
        session = session_create(
                url,
                principal,
                credentials,
                &session_listener,
                reconnection_strategy,
                &error);
        if(session != NULL) {
                char *sid_str = session_id_to_string(session->id);
                printf("Session created (state=%d, id=%s)\n", session_state_get(session), sid_str);
                free(sid_str);
        }
        else {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
        }

        // With the exception of backoff_args, the reconnection strategy is
        // copied withing session_create() and may be freed now
        free_reconnection_strategy(reconnection_strategy);

        // Sleep for a while
        sleep(20);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);

        return EXIT_SUCCESS;
}
Apple
class ExponentialBackoffReconnectionStrategy: NSObject, PTDiffusionSessionReconnectionStrategy {

    var attempt_count : Int = 0

    func diffusionSession(_ session: PTDiffusionSession,
                          wishesToReconnectWith attempt: PTDiffusionSessionReconnectionAttempt) {

        // limit the maximum delay time between reconnection attempts to 60 seconds
        let maximum_attempt_interval = 60.0

        // compute delay for exponential backoff on the number of attempts so far
        self.attempt_count += 1
        let delay = min(pow(2.0, Double(self.attempt_count)) * 0.1, maximum_attempt_interval)

        // schedule asynchronous execution
        let dispatch_time = DispatchTime.now() + DispatchTimeInterval.seconds(Int(delay))
        DispatchQueue.main.asyncAfter(deadline: dispatch_time) {
            print("Attempting to reconnect now")
            attempt.start()
        }
    }
}


func connect() {
    let configuration = PTDiffusionMutableSessionConfiguration()

    // Set the maximum amount of time we'll try and reconnect for to 10 minutes
    configuration.reconnectionTimeout = NSNumber(value: 10.0 * 60.0)

    // Set the reconnection strategy to be used
    configuration.reconnectionStrategy = ExponentialBackoffReconnectionStrategy()

    // Use the configuration to open a new session...
    PTDiffusionSession.open(with: URL(string: "url")!,
                            configuration: configuration) { (session, error) in

        // Check error is `nil`, then use session as required.
        // Ensure to maintain a strong reference to the session beyond the lifetime
        // of this callback, for example by assigning it to an instance variable.

        if (session == nil) {
            print("Failed to open session: %@", error!.localizedDescription)
            return
        }

        // At this point we now have a connected session.
        print("Connected.")
    }
}