Programming connection store

Programming connection store

Neuko provides an abstract class of ConnectionStore to make it flexible for developer to choose on how to save, retrieve check for existence and delete perpetual connection configuration file.

The file will be sent to the device during Activation request within provisioning process as describe in this guide.

For illustration, let’s assume that the connection configuration file is stored in will be saved as /secured/config/perpetual/connection.config.json.

import { DeviceIdentifier, ConnectionStore } from "@neukolabs/device-sdk-js";
import { readFileSync, writeFileSync, unlinkSync, existsSync } from 'fs';
import path from 'path';

class connectionStore extends ConnectionStore {

    constructor() {
        super();
    }

    readonly FILENAME   = "./secured/config/perpetual/connection.config.json";

    public getPerpetualConnectionSettings(deviceIdentifier: DeviceIdentifier): Promise<string> | string {
        return readFileSync(this.FILENAME).toString();
    }

    public savePerpetualConnectionSettings(deviceIdentifier: DeviceIdentifier, settings: string): Promise<void> | void {
        writeFileSync(this.FILENAME, settings as string);
    }

    public deletePerpetualConnectionSettings(deviceIdentifier: DeviceIdentifier): Promise<void> | void {
        return unlinkSync(this.FILENAME);
    }

    public isPerpetualConnectionSettingsExists(deviceIdentifier: DeviceIdentifier): Promise<boolean> | boolean {
        return existsSync(this.FILENAME);
    }
}


 

import os
from neuko.device.model import DeviceIdentifier
from neuko.connection.connectionStore import ConnectionStore

class DeviceConnectionStore(ConnectionStore):

    FILEPATH = '/opt/secured/config/perpetual/connection.config.json'

    async def getPerpetualConnectionSettings(self, deviceIdentifier: DeviceIdentifier) -> str:
        fd = open(self.FILEPATH, mode="r")
        raw = json.load(fd)
        fd.close()
        return raw

    async def savePerpetualConnectionSettings(self, deviceIdentifier: DeviceIdentifier, settings: str) -> bool:
        fd = open(self.FILEPATH, mode="w")
        json.dump(settings, fd)
        fd.close()
        return True

    async def deletePerpetualConnectionSettings(self, deviceIdentifier: DeviceIdentifier) -> bool:
        if self.isPerpetualConnectionSettingsExists(deviceIdentifier):
            os.remove(self.FILEPATH)
        return True

    async def isPerpetualConnectionSettingsExists(self, deviceIdentifier: DeviceIdentifier) -> bool:
        return os.path.exists(self.FILEPATH)


 

Scroll to Top