Source code for pyprediktormapclient.opc_ua

import asyncio
import copy
import json
import logging
from asyncio import Semaphore
from datetime import date, datetime, timedelta
from typing import Any, Callable, Dict, List, Optional, Union

import aiohttp
import nest_asyncio
import pandas as pd
import requests
from aiohttp import ClientSession
from pydantic import AnyUrl, BaseModel
from pydantic_core import Url
from requests import HTTPError

from pyprediktormapclient.shared import request_from_api

nest_asyncio.apply()


logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())


[docs] class Variables(BaseModel): """Helper class to parse all values api's. Variables are described in https://reference.opcfoundation.org/v104/Core/docs/Part3/8.2.1/ Variables: Id: str - Id of the signal, e.g. SSO.EG-AS.WeatherSymbol Namespace: int - Namespace on the signal, e.g. 2. IdType: int - IdTypes described in https://reference.opcfoundation.org/v104/Core/docs/Part3/8.2.3/. """ Id: str Namespace: int IdType: int
[docs] class SubValue(BaseModel): """Helper class to parse all values api's. Variables: Type: int - Type of variable, e.g. 12. string, 11. float, etc. list of types described in https://reference.opcfoundation.org/Core/Part6/v104/5.1.2/ Body: Union[str, float, int, bool] - The value of the varible, should match type. """ Type: int Body: Union[str, float, int, bool]
[docs] class HistoryValue(BaseModel): """Helper class to parse all values api's. Variables: Value: SubValue - Containing Type and Body (value) of the variable. Described in SubValue class. """ Value: SubValue
[docs] class StatsCode(BaseModel): """Helper class to parse all values api's. Variables: Code: Optional[int] - Status code, described in https://reference.opcfoundation.org/v104/Core/docs/Part8/A.4.3/ Symbol: Optional[str] - String value for status code, described in https://reference.opcfoundation.org/v104/Core/docs/Part8/A.4.3/ """ Code: Optional[int] = None Symbol: Optional[str] = None
[docs] class Value(BaseModel): """Helper class to parse all values api's. Variables: Value: SubValue - Containing Type and Body (value) of the variable. Described in SubValue class. SourceTimestamp: str - Timestamp of the source, e.g. when coming from an API the timestamp returned from the API for the varaible is the sourcetimestamp. SourcePicoseconds: Optional[int] - Picoseconds for the timestamp of the source if there is a need for a finer granularity, e.g. if samples are sampled in picosecond level or more precision is needed. ServerTimestamp: Optional[str] - Timestamp for the server, normally this is assigned by the server. ServerPicoseconds: Optional[int] - Picoseconds for the timestamp on the server, normally this is assigned by the server. StatusCode: StatusCode - Status code, described in https://reference.opcfoundation.org/v104/Core/docs/Part8/A.4.3/ """ Value: SubValue SourceTimestamp: datetime SourcePicoseconds: Optional[int] = None ServerTimestamp: Optional[datetime] = None ServerPicoseconds: Optional[int] = None StatusCode: Optional[StatsCode] = None
[docs] class WriteVariables(BaseModel): """Helper class for write values api. Variables: NodeId: Variables - The complete node'id for the variable Value: Value - The value to update for the node'id. """ NodeId: Variables Value: Value
[docs] class WriteHistoricalVariables(BaseModel): """Helper class for write historical values api. Variables: NodeId (str): The complete node'id for the variable PerformInsertReplace (int): Historical insertion method 1. Insert, 2. Replace 3. Update, 4. Remove UpdateValues (list): List of values to update for the node'id. Time must be in descending order. """ NodeId: Variables PerformInsertReplace: int UpdateValues: List[Value]
[docs] class WriteVariablesResponse(BaseModel): """Helper class for write historical values api. Variables: SymbolCodes: List[StatusCode] - A list of class StatusCode, described in StatusCode class. """ SymbolCodes: List[StatsCode]
[docs] class WriteReturn(BaseModel): """Helper class to collect API output with API input to see successfull writes for nodes. Variables: Id: str - The Id of the signal Value: str - The written value of the signal TimeStamp: str - THe SourceTimestamp of the written signal Success: bool - Success flag for the write operation """ Id: str Value: str TimeStamp: str Success: bool
[docs] class AsyncIONotebookHelper:
[docs] @staticmethod def run_coroutine(coroutine): loop = asyncio.get_event_loop() return loop.run_until_complete(coroutine)
[docs] class Config: arbitrary_types_allowed = True
[docs] class OPC_UA: """Helper functions to access the OPC UA REST Values API server. Args: rest_url (str): The complete url of the OPC UA Values REST API. E.g. "http://127.0.0.1:13371/" opcua_url (str): The complete url of the OPC UA Server that is passed on to the REST server. E.g. "opc.tcp://127.0.0.1:4872" namespaces (list): An optional but recommended ordered list of namespaces so that IDs match Returns: Object Todo: * Clean up use of files * Better session handling with aiohttp * Make sure that time convertions are with timezone """
[docs] class Config: arbitrary_types_allowed = True
def __init__( self, rest_url: AnyUrl, opcua_url: AnyUrl, namespaces: List = None, auth_client: object = None, session: requests.Session = None, ): """Class initializer. Args: rest_url (str): The complete url of the OPC UA Values REST API. E.g. "http://127.0.0.1:13371/" opcua_url (str): The complete url of the OPC UA Server that is passed on to the REST server. E.g. "opc.tcp://127.0.0.1:4872" namespaces (list): An optional but recommended ordered list of namespaces so that IDs match Returns: Object: The initialized class object """ self.TYPE_DICT = {t["id"]: t["type"] for t in TYPE_LIST} self.rest_url = rest_url self.opcua_url = opcua_url self.headers = { "Content-Type": "application/json", "Accept": "text/plain", } self.auth_client = auth_client self.session = session self.helper = AsyncIONotebookHelper() if not str(self.opcua_url).startswith("opc.tcp://"): raise ValueError("Invalid OPC UA URL") if self.auth_client is not None: if self.auth_client.token is not None: self.headers["Authorization"] = ( f"Bearer {self.auth_client.token.session_token}" ) self.body = { "Connection": {"Url": self.opcua_url, "AuthenticationType": 1} } if namespaces: self.body["ClientNamespaces"] = namespaces
[docs] def json_serial(self, obj): """JSON serializer for objects not serializable by default json code.""" if isinstance(obj, (datetime, date)): return obj.isoformat() elif isinstance(obj, Url): return str(obj) raise TypeError(f"Type {type(obj)} not serializable")
[docs] def check_auth_client(self, content): if content.get("error").get("code") == 404: self.auth_client.request_new_ory_token() self.headers["Authorization"] = ( f"Bearer {self.auth_client.token.session_token}" ) else: raise RuntimeError(content.get("ErrorMessage"))
def _get_value_type(self, id: int) -> Dict: """Internal function to get the type of a value from the OPC UA return,as documentet at https://docs.prediktor.com/docs/opcuavaluesrestapi/datatypes.html#variant Args: id (int): An integer in the range 1-25 representing the id Returns: dict: Dictionaly with keys "id", "type" and "description". All with None values if not found """ return next( (sub for sub in TYPE_LIST if sub["id"] == id), {"id": None, "type": None, "description": None}, ) def _get_variable_list_as_list(self, variable_list: list) -> list: """Internal function to convert a list of pydantic Variable models to a list of dicts. Args: variable_list (List): List of pydantic models Returns: List: List of dicts """ new_vars = [] for var in variable_list: if hasattr(var, "model_dump"): # Convert pydantic model to dict new_vars.append(var.model_dump()) elif isinstance(var, dict): # If var is already a dict, append as is new_vars.append(var) else: raise TypeError("Unsupported type in variable_list") return new_vars
[docs] def get_values(self, variable_list: List[Variables]) -> List: """Request realtime values from the OPC UA server. Args: variable_list (list): A list of variables you want, containing keys "Id", "Namespace" and "IdType" Returns: list: The input variable_list extended with "Timestamp", "Value", "ValueType", "StatusCode" and "StatusSymbol" (all defaults to None) """ # Create a new variable list to remove pydantic models vars = self._get_variable_list_as_list(variable_list) body = copy.deepcopy(self.body) body["NodeIds"] = vars try: content = request_from_api( rest_url=self.rest_url, method="POST", endpoint="values/get", data=json.dumps([body], default=self.json_serial), headers=self.headers, extended_timeout=True, ) except HTTPError as e: if self.auth_client is not None: self.check_auth_client(json.loads(e.response.content)) content = request_from_api( rest_url=self.rest_url, method="POST", endpoint="values/get", data=json.dumps([body], default=self.json_serial), headers=self.headers, extended_timeout=True, ) else: raise RuntimeError(f"Error in get_values: {str(e)}") from e except Exception as e: raise RuntimeError(f"Error in get_values: {str(e)}") from e for var in vars: # Add default None values var["Timestamp"] = None var["Value"] = None var["ValueType"] = None var["StatusCode"] = None var["StatusSymbol"] = None # Return if no content from server if not isinstance(content, list): return vars # Choose first item and return if not successful content = content[0] if content.get("Success") is False: raise RuntimeError(content.get("ErrorMessage")) # Return if missing values if not content.get("Values"): return vars # Use .get from one dict to the other to ensure None values if something is missing for num, row in enumerate(vars): contline = content["Values"][num] vars[num]["Timestamp"] = contline.get("ServerTimestamp") # Values are not present in the answer if not found if "Value" in contline: vars[num]["Value"] = contline["Value"].get("Body") vars[num]["ValueType"] = self._get_value_type( contline["Value"].get("Type") ).get("type") # StatusCode is not always present in the answer if "StatusCode" in contline: vars[num]["StatusCode"] = contline["StatusCode"].get("Code") vars[num]["StatusSymbol"] = contline["StatusCode"].get( "Symbol" ) return vars
def _check_content(self, content: Dict[str, Any]) -> None: """Check the content returned from the server. Args: content (dict): The content returned from the server. Raises: RuntimeError: If the content is not a dictionary, if the request was not successful, or if the content does not contain 'HistoryReadResults'. """ if not isinstance(content, dict): raise RuntimeError("No content returned from the server") if not content.get("Success"): raise RuntimeError(content.get("ErrorMessage")) if "HistoryReadResults" not in content: raise RuntimeError( "No history read results returned from the server" ) def _process_df( self, df_result: pd.DataFrame, columns: Dict[str, str] ) -> pd.DataFrame: """Process the DataFrame returned from the server.""" if "Value.Type" in df_result.columns: df_result["Value.Type"] = df_result["Value.Type"].replace( self.TYPE_DICT ) df_result.rename(columns=columns, errors="raise", inplace=True) return df_result async def _make_request( self, endpoint: str, body: dict, max_retries: int, retry_delay: int ): for attempt in range(max_retries): try: logging.info(f"Attempt {attempt + 1} of {max_retries}") async with ClientSession() as session: url = f"{self.rest_url}{endpoint}" logging.info(f"Making POST request to {url}") logging.debug(f"Request body: {body}") logging.debug(f"Request headers: {self.headers}") async with session.post( url, json=body, headers=self.headers ) as response: logging.info( f"Response received: Status {response.status}" ) if response.status >= 400: error_text = await response.text() logging.error( f"HTTP error {response.status}: {error_text}" ) await response.raise_for_status() return await response.json() except aiohttp.ClientResponseError as e: logging.error(f"ClientResponseError: {e}") if attempt == max_retries - 1: raise RuntimeError("Max retries reached") from e except aiohttp.ClientError as e: logging.error(f"ClientError in POST request: {e}") except Exception as e: logging.error(f"Unexpected error in _make_request: {e}") if attempt < max_retries - 1: wait_time = retry_delay * (2**attempt) logging.warning( f"Request failed. Retrying in {wait_time} seconds..." ) await asyncio.sleep(wait_time) logging.error("Max retries reached.") raise RuntimeError("Max retries reached") def _process_content(self, content: dict) -> pd.DataFrame: self._check_content(content) df_list = [] for item in content["HistoryReadResults"]: df = pd.json_normalize(item["DataValues"]) for key, value in item["NodeId"].items(): df[f"HistoryReadResults.NodeId.{key}"] = value df_list.append(df) if df_list: df_result = pd.concat(df_list) df_result.reset_index(inplace=True, drop=True) return df_result
[docs] async def get_historical_values( self, start_time: datetime, end_time: datetime, variable_list: List[str], endpoint: str, prepare_variables: Callable[[List[str]], List[dict]], additional_params: dict = None, max_data_points: int = 10000, max_retries: int = 3, retry_delay: int = 5, max_concurrent_requests: int = 30, ) -> pd.DataFrame: """Generic method to request historical values from the OPC UA server with batching.""" total_time_range_ms = (end_time - start_time).total_seconds() * 1000 estimated_intervals = total_time_range_ms / max_data_points max_variables_per_batch = max( 1, int(max_data_points / estimated_intervals) ) max_time_batches = max(1, int(estimated_intervals / max_data_points)) time_batch_size_ms = total_time_range_ms / max_time_batches extended_variables = prepare_variables(variable_list) variable_batches = [ extended_variables[i : i + max_variables_per_batch] for i in range(0, len(extended_variables), max_variables_per_batch) ] semaphore = Semaphore(max_concurrent_requests) async def process_batch(variables, time_batch): async with semaphore: batch_start_ms = time_batch * time_batch_size_ms batch_end_ms = min( (time_batch + 1) * time_batch_size_ms, total_time_range_ms ) batch_start = start_time + timedelta( milliseconds=batch_start_ms ) batch_end = start_time + timedelta(milliseconds=batch_end_ms) body = { **self.body, "StartTime": batch_start.isoformat() + "Z", "EndTime": batch_end.isoformat() + "Z", "ReadValueIds": variables, **(additional_params or {}), } content = await self._make_request( endpoint, body, max_retries, retry_delay ) return self._process_content(content) tasks = [ process_batch(variables, time_batch) for variables in variable_batches for time_batch in range(max_time_batches) ] results = await asyncio.gather(*tasks) results = [df for df in results if df is not None] if not results: return pd.DataFrame() combined_df = pd.concat(results, ignore_index=True) return combined_df
[docs] async def get_historical_raw_values_asyn( self, start_time: datetime, end_time: datetime, variable_list: List[str], limit_start_index: Union[int, None] = None, limit_num_records: Union[int, None] = None, **kwargs, ) -> pd.DataFrame: """Request raw historical values from the OPC UA server.""" additional_params = {} if limit_start_index is not None and limit_num_records is not None: additional_params["Limit"] = { "StartIndex": limit_start_index, "NumRecords": limit_num_records, } combined_df = await self.get_historical_values( start_time, end_time, variable_list, "values/historical", lambda vars: [{"NodeId": var} for var in vars], additional_params, **kwargs, ) columns = { "Value.Type": "ValueType", "Value.Body": "Value", "SourceTimestamp": "Timestamp", "HistoryReadResults.NodeId.IdType": "IdType", "HistoryReadResults.NodeId.Id": "Id", "HistoryReadResults.NodeId.Namespace": "Namespace", } return self._process_df(combined_df, columns)
[docs] def get_historical_raw_values(self, *args, **kwargs): result = self.helper.run_coroutine( self.get_historical_raw_values_asyn(*args, **kwargs) ) return result
[docs] async def get_historical_aggregated_values_asyn( self, start_time: datetime, end_time: datetime, pro_interval: int, agg_name: str, variable_list: List[str], **kwargs, ) -> pd.DataFrame: """Request historical aggregated values from the OPC UA server.""" additional_params = { "ProcessingInterval": pro_interval, "AggregateName": agg_name, } combined_df = await self.get_historical_values( start_time, end_time, variable_list, "values/historicalaggregated", lambda vars: [ {"NodeId": var, "AggregateName": agg_name} for var in vars ], additional_params, **kwargs, ) columns = { "Value.Type": "ValueType", "Value.Body": "Value", "StatusCode.Symbol": "StatusSymbol", "StatusCode.Code": "StatusCode", "SourceTimestamp": "Timestamp", "HistoryReadResults.NodeId.IdType": "IdType", "HistoryReadResults.NodeId.Id": "Id", "HistoryReadResults.NodeId.Namespace": "Namespace", } return self._process_df(combined_df, columns)
[docs] def get_historical_aggregated_values(self, *args, **kwargs): result = self.helper.run_coroutine( self.get_historical_aggregated_values_asyn(*args, **kwargs) ) return result
[docs] def write_values(self, variable_list: List[WriteVariables]) -> List: """Request to write realtime values to the OPC UA server. Args: variable_list (list): A list of variables you want to write to with the value, timestamp and quality, containing keys "Id", "Namespace", "Values" and "IdType". Returns: list: The input variable_list extended with "Timestamp", "Value", "ValueType", "StatusCode" and "StatusSymbol" (all defaults to None) """ # Create a new variable list to remove pydantic models vars = self._get_variable_list_as_list(variable_list) body = copy.deepcopy(self.body) body["WriteValues"] = vars try: content = request_from_api( rest_url=self.rest_url, method="POST", endpoint="values/set", data=json.dumps([body], default=self.json_serial), headers=self.headers, extended_timeout=True, ) except HTTPError as e: if self.auth_client is not None: self.check_auth_client(json.loads(e.response.content)) content = request_from_api( rest_url=self.rest_url, method="POST", endpoint="values/set", data=json.dumps([body], default=self.json_serial), headers=self.headers, extended_timeout=True, ) else: raise RuntimeError(f"Error in write_values: {str(e)}") except Exception as e: raise RuntimeError(f"Error in write_values: {str(e)}") # Return if no content from server if not isinstance(content, dict): return None if content.get("Success") is False: raise RuntimeError(content.get("ErrorMessage")) if content.get("StatusCodes") is None: raise ValueError( "No status codes returned, might indicate no values written" ) # Use to place successfull write next to each written values as API only returns list. Assumes same index in response as in request. for num, row in enumerate(vars): vars[num]["WriteSuccess"] = ( lambda x: True if (x == 0) else False )(content["StatusCodes"][num].get("Code")) return vars
[docs] def write_historical_values( self, variable_list: List[WriteHistoricalVariables] ) -> List: """Request to write realtime values to the OPC UA server. Args: variable_list (list): A list of variables you want, containing keys "Id", "Namespace", "Values" and "IdType". Values must be in descending order of the timestamps. Returns: list: The input variable_list extended with "Timestamp", "Value", "ValueType", "StatusCode" and "StatusSymbol" (all defaults to None) """ # Check if data is in correct order, if wrong fail. for variable in variable_list: if len(variable.get("UpdateValues", [])) > 1: for num_variable in range(len(variable["UpdateValues"]) - 1): if not ( ( variable["UpdateValues"][num_variable][ "SourceTimestamp" ] ) < variable["UpdateValues"][num_variable + 1][ "SourceTimestamp" ] ): raise ValueError( "Time for variables not in correct order." ) # Create a new variable list to remove pydantic models vars = self._get_variable_list_as_list(variable_list) body = copy.deepcopy(self.body) body["UpdateDataDetails"] = vars try: content = request_from_api( rest_url=self.rest_url, method="POST", endpoint="values/historicalwrite", data=json.dumps(body, default=self.json_serial), headers=self.headers, extended_timeout=True, ) except HTTPError as e: if self.auth_client is not None: self.check_auth_client(json.loads(e.response.content)) # Retry the request after checking auth content = request_from_api( rest_url=self.rest_url, method="POST", endpoint="values/historicalwrite", data=json.dumps(body, default=self.json_serial), headers=self.headers, extended_timeout=True, ) else: raise RuntimeError( f"Error in write_historical_values: {str(e)}" ) except Exception as e: raise RuntimeError(f"Error in write_historical_values: {str(e)}") # Return if no content from server if not isinstance(content, dict): return None # Crash if success if false if content.get("Success") is False: raise RuntimeError(content.get("ErrorMessage")) # Crash if history report is missing if content.get("HistoryUpdateResults") is None: raise ValueError( "No status codes returned, might indicate no values written" ) # Check if there are per history update error codes returned for num_var, variable_row in enumerate(vars): # Use to place successfull write next to each written values as API only returns list. Assumes same index in response as in request. if content["HistoryUpdateResults"][num_var] == {}: vars[num_var]["WriteSuccess"] = True else: vars[num_var]["WriteSuccess"] = False vars[num_var]["WriteError"] = content["HistoryUpdateResults"][ num_var ].get("StatusCode") return vars
[docs] def check_if_ory_session_token_is_valid_refresh(self): """Check if the session token is still valid.""" if self.auth_client.check_if_token_has_expired(): self.auth_client.refresh_token()
TYPE_LIST = [ { "id": 0, "type": "Null", "description": "An invalid or unspecified value", }, { "id": 1, "type": "Boolean", "description": "A boolean logic value (true or false)", }, {"id": 2, "type": "SByte", "description": "An 8 bit signed integer value"}, { "id": 3, "type": "Byte", "description": "An 8 bit unsigned integer value", }, {"id": 4, "type": "Int16", "description": "A 16 bit signed integer value"}, { "id": 5, "type": "UInt16", "description": "A 16 bit unsigned integer value", }, {"id": 6, "type": "Int32", "description": "A 32 bit signed integer value"}, { "id": 7, "type": "UInt32", "description": "A 32 bit unsigned integer value", }, {"id": 8, "type": "Int64", "description": "A 64 bit signed integer value"}, { "id": 9, "type": "UInt64", "description": "A 64 bit unsigned integer value", }, { "id": 10, "type": "Float", "description": "An IEEE single precision (32 bit) floating point value", }, { "id": 11, "type": "Double", "description": "An IEEE double precision (64 bit) floating point value", }, { "id": 12, "type": "String", "description": "A sequence of Unicode characters", }, {"id": 13, "type": "DateTime", "description": "An instance in time"}, { "id": 14, "type": "Guid", "description": "A 128-bit globally unique identifier", }, {"id": 15, "type": "ByteString", "description": "A sequence of bytes"}, {"id": 16, "type": "XmlElement", "description": "An XML element"}, { "id": 17, "type": "NodeId", "description": "An identifier for a node in the address space of a UA server", }, { "id": 18, "type": "ExpandedNodeId", "description": "A node id that stores the namespace URI instead of the namespace index", }, { "id": 19, "type": "StatusCode", "description": "A structured result code", }, { "id": 20, "type": "QualifiedName", "description": "A string qualified with a namespace", }, { "id": 21, "type": "LocalizedText", "description": "A localized text string with an locale identifier", }, { "id": 22, "type": "ExtensionObject", "description": "An opaque object with a syntax that may be unknown to the receiver", }, { "id": 23, "type": "DataValue", "description": "A data value with an associated quality and timestamp", }, { "id": 24, "type": "Variant", "description": "Any of the other built-in types", }, { "id": 25, "type": "DiagnosticInfo", "description": "A diagnostic information associated with a result code", }, ]