#!/usr/bin/env python

"""
Usage: test_pycohal -c <connections xml file> [-v] [other options for unittest module]
Executes the tests of the Python bindings of uHAL (pycohal)

Mandatory option:
   -c, --connections-file    Connections xml file used in tests
   -v                        Verbose reporting (one line per test)
"""

from random import randint
import array
import numpy
import sys, os, getopt
import unittest
import uhal
import time

# USEFUL FUNCTIONS

def randuint32():
    return randint(0, 0xffffffff)

def list_randuint32s(length):
    randInts = []
    for i in range(length):
        randInts.append( randuint32() )
    return randInts


##################################################################################
# SECTION 1:
#    Tests specific to python bindings - e.g. conversion of objects across C/python boundary.
#    (Doesn't involve any actual interaction with H/W)

class TestEnums(unittest.TestCase):
    """TestCase sub-class checking that behaviour of wrapped enums (i.e equality & inequality) is correct. """

    def gen_enum_list_BlockMode(self):
        theList = []
        theList.append(uhal.BlockReadWriteMode.SINGLE)
        theList.append(uhal.BlockReadWriteMode.INCREMENTAL)
        theList.append(uhal.BlockReadWriteMode.NON_INCREMENTAL)
        theList.append(uhal.BlockReadWriteMode.HIERARCHICAL)
        return theList

    def gen_enum_list_NodePerm(self):
        theList = []
        theList.append(uhal.NodePermission.READ)
        theList.append(uhal.NodePermission.WRITE)
        theList.append(uhal.NodePermission.READWRITE)
        return theList

    def gen_enum_list_LogLevel(self):
        theList = []
        theList.append( uhal.LogLevel.DEBUG )
        theList.append( uhal.LogLevel.INFO )
        theList.append( uhal.LogLevel.NOTICE )
        theList.append( uhal.LogLevel.WARNING )
        theList.append( uhal.LogLevel.ERROR )
        theList.append( uhal.LogLevel.FATAL )
        return theList

    def exe_equality_test(self, func_gen_enum_list):
        enumList1 = func_gen_enum_list()
        enumList2 = func_gen_enum_list()
        for i1 in range(len(enumList1)):
            for i2 in range(len(enumList2)):
                item1 = enumList1[i1]
                item2 = enumList2[i2]
                if i1 == i2:
                    self.assertEqual(item1, item2)
                else:
                    self.assertTrue(item1 != item2)

    def test_NodePermission_equality(self):
        self.exe_equality_test( self.gen_enum_list_NodePerm )

    def test_BlockReadWriteMode_equality(self):
        self.exe_equality_test( self.gen_enum_list_BlockMode )

    def test_LogLevel_equality(self):
        self.exe_equality_test( self.gen_enum_list_LogLevel )


class TestUInt32s(unittest.TestCase):
    """
    TestCase sub-class checking that:
              1) uint32 arguments and return values remain unchanged when crossing the C-python boundary
              2) Exception is raised when invalid values are given for uint32 arguments
    (This behaviour is not always assured in certain flavours of python bindings on 32-bit machines.)
    """

    def gen_list_invalid_numbers(self):
        """Return list of negative numbers, and numbers too large for uint32."""
        # Negative numbers
        l = [-1, -2, -3, -5]
        for i in range(10):
            l.append( randint(-10000, -5) )
        # Too large
        l += [0x100000000, 4294967296 ]  # 2^32
        l += [0x100000001, 4294967297 ]  # 2^32 +1
        l += [0x100000002, 4294967298 ]  # 2^32 +2
        return l

    def gen_list_valid_numbers(self):
        """Return list of uint32s, including values around relevant boundaries for this TestCase."""
        l = [0, 1, 2, 3, 10, 49]
        for i in range(10):
            l.append( randint(50, 0x7fffffff) )  # < 2^31-1
        l += [0x7fffffff, 2147483647]  # 2^31 -1 (Max value of 32-bit signed int)
        l += [0x80000000, 2147483648]  # 2^31    (Above max value of 32-bit signed int)
        l += [0x80000001, 2147483649]  # 2^31 +1
        for i in range(10):
            l.append( randint(0x80000002, 0xfffffffe) ) #  2^31+2 <= value < 2^32-2
        l += [0xfffffffe, 4294967294]  # 2^32 -2
        l += [0xffffffff, 4294967295]  # 2^32 -1 (Max value of uint32)
        return l

    def test_uint32_as_argument(self):
        #print "Invalid numbers ..."
        for i in self.gen_list_invalid_numbers():
            #print "  ", i, " ", type(i)
            self.assertRaises(TypeError, uhal.tests.check_uint32_argument, i, str(i).rstrip("L") )
        #print "Valid numbers ... "
        for i in self.gen_list_valid_numbers():
            #print "  ", i, " ", type(i)
            self.assertTrue( uhal.tests.check_uint32_argument(i, str(i).rstrip("L") ) )

    def test_uint32_as_return_value(self):
        for i in self.gen_list_valid_numbers():
            self.assertEqual( i, uhal.tests.convert_str_to_uint32(str(i).rstrip("L")) )


class TestConverters(unittest.TestCase):
    """
    TestCase sub-class checking behaviour of to-/from-python converters
    To-python converters (e.g. used for C++ return values):
        - vector<string> to list
    From-python converters (e.g. used for converting from python objects to C++ arguments):
        - vector<uint32_t> from list
    """

    def test_vec_uint32_from_list_and_to_list(self):
        # Firstly check valid conversions
        self.exe_test_vec_uint32_from_list_and_to_list( [] )
        self.exe_test_vec_uint32_from_list_and_to_list( [42, 56, 74] )
        self.exe_test_vec_uint32_from_list_and_to_list( list_randuint32s(100) )
        self.exe_test_vec_uint32_from_list_and_to_list( list_randuint32s( randint(1000,10000) ) )

        # Now check conversion that should fail
        self.assertRaises( TypeError, uhal.tests.copy_vec_uint32, ([42, -1, 76]) )
        self.assertRaises( TypeError, uhal.tests.copy_vec_uint32, ([3, 6, 22222222222222222222222222222222222222223]) )
        self.assertRaises( TypeError, uhal.tests.copy_vec_uint32, ([1,2.1,3]) )
        self.assertRaises( TypeError, uhal.tests.copy_vec_uint32, ([[], []]) )
        self.assertRaises( TypeError, uhal.tests.copy_vec_uint32, (['bob']) )

    def exe_test_vec_uint32_from_list_and_to_list(self, list_of_uint32s):
        list_copy = uhal.tests.copy_vec_uint32( list_of_uint32s )
        self.assertTrue( isinstance(list_copy, list) )
        self.assertEqual( len(list_copy), len(list_of_uint32s) )
        for i in range( len(list_copy) ):
            self.assertEqual( list_copy[i], list_of_uint32s[i] )

    def test_vec_uint32_to_list(self):
        """Test conversion of C++ vector<uint32_t> return values to python list"""
        self.exe_test_vec_uint32_to_list( [] )
        self.exe_test_vec_uint32_to_list( [randuint32()] )
        self.exe_test_vec_uint32_to_list( [1,42,99] )
        self.exe_test_vec_uint32_to_list( list_randuint32s(100) )
        self.exe_test_vec_uint32_to_list( list_randuint32s(1000) )
        self.exe_test_vec_uint32_to_list( list_randuint32s(10000) )

    def exe_test_vec_uint32_to_list(self, list_of_uint32s):
        list_from_vector = uhal.tests.convert_str_to_vec_uint32( ",".join(str(i) for i in list_of_uint32s) )
        self.assertEqual ( list_of_uint32s, list_from_vector )

    def test_vec_string_to_list(self):
        """Test conversion of C++ vector<string> return values to python list."""
        self.exe_test_vec_string_to_list( [] )
        self.exe_test_vec_string_to_list( ['alpha'] )
        self.exe_test_vec_string_to_list( ['a', 'b'] )
        self.exe_test_vec_string_to_list( ['', ' ', 'test'] )
        self.exe_test_vec_string_to_list( ['alphabet', 'b', 'c', 'd', 'e'] )

    def exe_test_vec_string_to_list(self, list_of_strings):
        list_from_vector = uhal.tests.convert_str_to_vec_str( ",".join(list_of_strings) )
        self.assertEqual( list_of_strings, list_from_vector )


class TestLogLevels(unittest.TestCase):
    """TestCase sub-class checking that log-levels are set correctly."""
    def setUp(self):
        self.allLevels = [uhal.LogLevel.FATAL, uhal.LogLevel.ERROR, uhal.LogLevel.WARNING, uhal.LogLevel.NOTICE, uhal.LogLevel.INFO, uhal.LogLevel.DEBUG ]

    def exe_test(self, logLevelSetTo, logLevelsIncluded):
        logLevelsExcluded = list( set(self.allLevels) - set(logLevelsIncluded) )
        uhal.setLogLevelTo( logLevelSetTo )
        for level in logLevelsIncluded:
           self.assertTrue( uhal.LoggingIncludes(level) )
        for level in logLevelsExcluded:
           self.assertEqual( uhal.LoggingIncludes(level), False )

    def test_LogLevels(self):
        uhal.disableLogging()

        includedLevels = []
        for level in self.allLevels:
           includedLevels.append(level)
           self.exe_test( level , includedLevels)

        # Reset log-level to FATAL so that logging lines don't clutter up tests
        uhal.setLogLevelTo( uhal.LogLevel.FATAL )


class TestValWordIntMethods(unittest.TestCase):
    """TestCase sub-class testing numeric & comparison operators for ValWord_uint32"""

    def test_int_methods(self):
       self.exe_test_int_methods(0xffffffff, randuint32(), False)
       self.exe_test_int_methods(0xffffffff, randuint32(), True)
       self.exe_test_int_methods(randuint32(), 0xffffffff, False)
       self.exe_test_int_methods(randuint32(), 0xffffffff, True)
       for i in range(200):
           (x1, x2) = (randuint32(), randuint32())
           self.exe_test_int_methods(x1, x2, False)
           self.exe_test_int_methods(x1, x2, True)

    def assertAllEqual(self, *args):
        assert( len(args) > 1 )
        for i in range( len(args)-1 ):
            self.assertEqual(args[0], args[i+1])

    def exe_test_int_methods(self, x1, x2, valid):
        reg1 = uhal.tests.get_dummy_ValWord(x1, valid)
        reg2 = uhal.tests.get_dummy_ValWord(x2, valid)

        # Mod-32 copies for pow & bitshifting
        s1 = x1 % 32
        sreg1 = uhal.tests.get_dummy_ValWord(s1, valid)
        s2 = x2 % 32
        sreg2 = uhal.tests.get_dummy_ValWord(s2, valid)

        # __invert__  ,  __neg__  ,  __pos__
        if valid:
            self.assertEqual ( ~x1 , ~reg1 )
            self.assertEqual ( -x1 , -reg1 )
            self.assertEqual ( +x1 , +reg1 )

        # __(r)add__  ,  __(r)sub__  ,  __(r)mul__  ,  __(r)mod__
        if valid:
            self.assertAllEqual ( x1 + x2 , reg1 + x2 , x1 + reg2 , reg1 + reg2 )
            self.assertAllEqual ( x1 - x2 , reg1 - x2 , x1 - reg2 , reg1 - reg2 )
            self.assertAllEqual ( x1 * x2 , reg1 * x2 , x1 * reg2 , reg1 * reg2 )
            self.assertAllEqual ( x1 % x2 , reg1 % x2 , x1 % reg2 , reg1 % reg2 )
        else:
            self.assertRaises ( uhal.NonValidatedMemory, reg1.__add__ , (x2) )
            self.assertRaises ( uhal.NonValidatedMemory, reg1.__radd__ , (x2) )
            self.assertRaises ( uhal.NonValidatedMemory, reg1.__sub__ , (x2) )
            self.assertRaises ( uhal.NonValidatedMemory, reg1.__rsub__ , (x2) )
            self.assertRaises ( uhal.NonValidatedMemory, reg1.__mul__ , (x2) )
            self.assertRaises ( uhal.NonValidatedMemory, reg1.__rmul__ , (x2) )
            self.assertRaises ( uhal.NonValidatedMemory, reg1.__mod__ , (x2) )
            self.assertRaises ( uhal.NonValidatedMemory, reg1.__rmod__ , (x2) )

        # __pow__  &  __rpow__
        if valid:
            self.assertAllEqual ( s1 ** s2 , sreg1 ** s2 , s1 ** sreg2 , sreg1 ** sreg2 )
        else:
            self.assertRaises( uhal.NonValidatedMemory, reg1.__pow__ , (x2) )
            self.assertRaises( uhal.NonValidatedMemory, reg1.__rpow__ , (x2) )

        # __(r)lshift__  &  __(r)rshift__
        if valid:
            self.assertAllEqual ( x1 << s2 , reg1 << s2 , x1 << sreg2 , reg1 << sreg2 )
            self.assertAllEqual ( x1 >> s2 , reg1 >> s2 , x1 >> sreg2 , reg1 >> sreg2 )
        else:
            self.assertRaises( uhal.NonValidatedMemory, reg1.__lshift__ , (x2) )
            self.assertRaises( uhal.NonValidatedMemory, reg1.__rlshift__, (x2) )
            self.assertRaises( uhal.NonValidatedMemory, reg1.__rshift__ , (x2) )
            self.assertRaises( uhal.NonValidatedMemory, reg1.__rrshift__, (x2) )

        # __(r)and__  &  __(r)or__
        if valid:
            self.assertAllEqual ( x1 & x2 , reg1 & x2 , x1 & reg2 , reg1 & reg2 )
            self.assertAllEqual ( x1 | x2 , reg1 | x2 , x1 | reg2 , reg1 | reg2 )
            self.assertAllEqual ( x1 ^ x2 , reg1 ^ x2 , x1 ^ reg2 , reg1 ^ reg2 )
        else:
            self.assertRaises( uhal.NonValidatedMemory, reg1.__and__, (x2) )
            self.assertRaises( uhal.NonValidatedMemory, reg1.__rand__, (x2) )
            self.assertRaises( uhal.NonValidatedMemory, reg1.__or__, (x2) )
            self.assertRaises( uhal.NonValidatedMemory, reg1.__ror__, (x2) )
            self.assertRaises( uhal.NonValidatedMemory, reg1.__xor__, (x2) )
            self.assertRaises( uhal.NonValidatedMemory, reg1.__rxor__, (x2) )

        # __nonzero__
        if valid:
            x0 = 0
            reg0 = uhal.tests.get_dummy_ValWord(x0, True)
            if (sys.version_info[0] > 2):
                self.assertEqual ( x0.__bool__() , reg0.__bool__() )
                self.assertEqual ( x1.__bool__() , reg1.__bool__() )
            else:
                self.assertEqual ( x0.__nonzero__() , reg0.__nonzero__() )
                self.assertEqual ( x1.__nonzero__() , reg1.__nonzero__() )
            self.assertEqual ( bool(x0) , bool(reg0) )
            self.assertEqual ( bool(x1) , bool(reg1) )
        else:
            self.assertRaises( uhal.NonValidatedMemory, bool , (reg1) )

        # __cmp__
        if valid:
            self.assertAllEqual ( x1 <  x2 , reg1 <  x2 , x1 <  reg2 , reg1 <  reg2 )
            self.assertAllEqual ( x1 <= x2 , reg1 <= x2 , x1 <= reg2 , reg1 <= reg2 )
            self.assertAllEqual ( x1 >  x2 , reg1 >  x2 , x1 >  reg2 , reg1 >  reg2 )
            self.assertAllEqual ( x1 >= x2 , reg1 >= x2 , x1 >= reg2 , reg1 >= reg2 )
            self.assertAllEqual ( x1 == x2 , reg1 == x2 , x1 == reg2 , reg1 == reg2 )
            self.assertAllEqual ( x1 != x2 , reg1 != x2 , x1 != reg2 , reg1 != reg2 )
            reg1b = uhal.tests.get_dummy_ValWord(x1, True)
            self.assertAllEqual ( x1 == x1 , reg1 == x1 , x1 == reg1 , reg1 == reg1b )
        else:
            if (sys.version_info[0] > 2):
                self.assertRaises( uhal.NonValidatedMemory, reg1.__lt__, (x1) )
                self.assertRaises( uhal.NonValidatedMemory, reg1.__le__, (x1) )
                self.assertRaises( uhal.NonValidatedMemory, reg1.__eq__, (x1) )
                self.assertRaises( uhal.NonValidatedMemory, reg1.__ne__, (x1) )
                self.assertRaises( uhal.NonValidatedMemory, reg1.__gt__, (x1) )
                self.assertRaises( uhal.NonValidatedMemory, reg1.__ge__, (x1) )
                self.assertRaises( uhal.NonValidatedMemory, reg1.__lt__, (reg2) )
                self.assertRaises( uhal.NonValidatedMemory, reg1.__le__, (reg2) )
                self.assertRaises( uhal.NonValidatedMemory, reg1.__eq__, (reg2) )
                self.assertRaises( uhal.NonValidatedMemory, reg1.__ne__, (reg2) )
                self.assertRaises( uhal.NonValidatedMemory, reg1.__gt__, (reg2) )
                self.assertRaises( uhal.NonValidatedMemory, reg1.__ge__, (reg2) )
            else:
                self.assertRaises( uhal.NonValidatedMemory, reg1.__cmp__, (x1) )
                self.assertRaises( uhal.NonValidatedMemory, reg1.__cmp__, (reg2) )

        # __format__
        if sys.hexversion >= 0x020600F0:
            if valid:
                self.assertEqual ( "{0:d}".format(x1) , "{0:d}".format(reg1) )
                self.assertEqual ( "{0:x}".format(x1) , "{0:x}".format(reg1) )
                self.assertAllEqual ( hex(x1), hex(reg1), "0x{0:x}".format(x1), "0x{0:x}".format(reg1) )
            else:
                self.assertRaises( uhal.NonValidatedMemory , reg1.__format__ , ("{0:d}") )
                self.assertRaises( uhal.NonValidatedMemory , reg1.__format__ , ("{0:x}") )



##################################################################################
# SECTION 2:
#    Tests of normal use cases, involving actual interaction with (dummy) hardware
#    (There is usually an equivalent C++ executable in uhal/tests.)


class TestConnectionManager(unittest.TestCase):
    """
    TestCase sub-class checking that connection manager returns objects with correct attributes (incl. for user-defined clients).
    """

    def setUp(self):
        self.manager = uhal.ConnectionManager(CONNECTIONS_FILE)

    def check_hw_properties(self, hw, id, uri, clientType):
        self.assertEqual(hw.id(),  id)
        self.assertEqual(hw.uri(), uri)
        self.assertEqual(hw.getClient().id(),  id)
        self.assertEqual(hw.getClient().uri(), uri)
        self.assertEqual(type(hw.getClient()), clientType)

    def exe_test_getDevice_member(self, id, uri, clientType):
        hw = self.manager.getDevice(id)
        self.check_hw_properties(hw, id, uri, clientType)

    def exe_test_getDevice_static(self, id, uri, clientType):
        hw = uhal.getDevice(id, uri, ADDRESS_FILE)
        self.check_hw_properties(hw, id, uri, clientType)

    def test_getDevice_coreClients(self):
        self.exe_test_getDevice_member('dummy.udp', 'ipbusudp-1.3://localhost:50001', uhal._core._UDP_IPbus_1_3)
        self.exe_test_getDevice_member('dummy.udp2', 'ipbusudp-2.0://localhost:60001', uhal._core._UDP_IPbus_2_0)
        self.exe_test_getDevice_member('dummy.tcp', 'ipbustcp-1.3://localhost:50002', uhal._core._TCP_IPbus_1_3)
        self.exe_test_getDevice_member('dummy.tcp2', 'ipbustcp-2.0://localhost:60002', uhal._core._TCP_IPbus_2_0)
        self.exe_test_getDevice_member('dummy.controlhub', 'chtcp-1.3://localhost:10203?target=localhost:50001', uhal._core._TCP_ControlHub_IPbus_1_3)
        self.exe_test_getDevice_member('dummy.controlhub2', 'chtcp-2.0://localhost:10203?target=localhost:60001', uhal._core._TCP_ControlHub_IPbus_2_0)
        self.exe_test_getDevice_member('dummy.pcie2', 'ipbuspcie-2.0:///tmp/uhal_pcie_client2device,/tmp/uhal_pcie_device2client', uhal.ClientInterface)

        self.exe_test_getDevice_static('alice', 'ipbusudp-1.3://localhost:50001', uhal._core._UDP_IPbus_1_3)
        self.exe_test_getDevice_static('bob', 'ipbusudp-2.0://localhost:50001', uhal._core._UDP_IPbus_2_0)
        self.exe_test_getDevice_static('charlie', 'ipbustcp-1.3://localhost:50001', uhal._core._TCP_IPbus_1_3)
        self.exe_test_getDevice_static('dave', 'ipbustcp-2.0://localhost:50001', uhal._core._TCP_IPbus_2_0)
        self.exe_test_getDevice_static('x', 'chtcp-1.3://localhost:12345?target=localhost:50001', uhal._core._TCP_ControlHub_IPbus_1_3)
        self.exe_test_getDevice_static('y', 'chtcp-2.0://localhost:12345?target=localhost:50001', uhal._core._TCP_ControlHub_IPbus_2_0)
        self.exe_test_getDevice_static('z', 'ipbuspcie-2.0:///path1,/path2', uhal.ClientInterface)
        self.exe_test_getDevice_static('z', 'ipbusmmap-2.0:///path/to/file', uhal.ClientInterface)

        self.assertRaises(uhal.exception, uhal.getDevice, 'some_id', 'unknown_protocol://localhost:50001', ADDRESS_FILE)

    def test_getDevice_userClients(self):
        self.check_hw_properties(uhal.getDevice('bob', '__test__://localhost:50001', ADDRESS_FILE, ['__test__']), 'bob', '__test__://localhost:50001', uhal.ClientInterface)

        self.assertRaises(uhal.exception, uhal.getDevice, 'bob', '__test__://localhost:50001', ADDRESS_FILE)
        self.assertRaises(uhal.exception, uhal.getDevice, 'bob', '__test__://localhost:50001', ADDRESS_FILE, [])


class TestSingle(unittest.TestCase):
    """
    TestCase sub-class checking write & read behaviour for a single register.
    Analogous to C++ test_dummy_single exe.
    """

    def test_connect_write_read(self):
        manager = uhal.ConnectionManager( CONNECTIONS_FILE )
        hw = manager.getDevice( DEVICE_ID )
        # Queue IPBus transactions
        x = randuint32()
        hw.getNode("REG").write(x)
        mem = hw.getNode("REG").read()
        # Pre-dispatch tests
        self.assertEqual( mem.valid(), False )
        self.assertRaises( uhal.NonValidatedMemory, mem.value )
        self.assertRaises( uhal.NonValidatedMemory, int, (mem) )
        self.assertRaises( uhal.NonValidatedMemory, hex, (mem) )
        # Dispatch & check value
        hw.dispatch()
        self.assertTrue( mem.valid() )
        self.assertEqual( mem.value(), x )
        self.assertEqual( int(mem)   , x )
        self.assertEqual( hex(mem)   , hex(x).lower().rstrip("l") ) # lower() and rstrip("l") needed for running on 32-bit machines

    def test_on_the_fly_connect_write_read(self):
        hw = uhal.getDevice("test_device_id", "ipbusudp-1.3://localhost:50001", ADDRESS_FILE)
        # Queue IPBus transactions
        x = randuint32()
        hw.getNode("REG").write(x)
        mem = hw.getNode("REG").read()
        # Pre-dispatch tests
        self.assertEqual( mem.valid(), False)
        self.assertRaises( uhal.NonValidatedMemory, mem.value )
        self.assertRaises( uhal.NonValidatedMemory, int, (mem) )
        # Dispatch & check value
        hw.dispatch()
        self.assertTrue( mem.valid() )
        self.assertEqual( mem.value(), x )
        self.assertEqual( int(mem)   , x )

    def test_search_device_id(self):
        manager = uhal.ConnectionManager( CONNECTIONS_FILE )
        ids = manager.getDevices( "^"+DEVICE_ID+"$" )
        self.assertEqual( len(ids), 1 )
        self.assertTrue( DEVICE_ID in ids )

        ids = manager.getDevices( ".+\.udp" )
        self.assertEqual( len(ids), 1 )
        self.assertTrue( "dummy.udp" in ids )


class TestBlock(unittest.TestCase):
    """
    TestCase sub-class checking behaviour of block reads & writes.
    Also includes tests of all of ValVector_uint32 API - incl. indexing & iteration.
    """

    def setUp(self):
        manager = uhal.ConnectionManager(CONNECTIONS_FILE)
        self.hw = manager.getDevice(DEVICE_ID)

    def exe_test_block_write_read(self, N):
        # Queue IPBus transactions
        xx = list_randuint32s(N)
        self.hw.getNode("MEM").writeBlock( xx )
        memVals = self.hw.getNode("MEM").readBlock(N)

        # Pre-dispatch tests
        self.assertEqual( memVals.valid(), False )
        self.assertEqual( memVals.size(), N )
        self.assertEqual( len(memVals), N )
        for i in range(N):
           self.assertRaises( uhal.NonValidatedMemory, memVals.at, (i) )
           self.assertRaises( uhal.NonValidatedMemory, memVals.__getitem__ , (i) )
        self.assertRaises( uhal.NonValidatedMemory, memVals.value )
        self.assertRaises( uhal.NonValidatedMemory, memVals.__iter__ )
        self.assertRaises( IndexError , memVals.__getitem__ , (-N-1) )
        self.assertRaises( IndexError , memVals.__getitem__ , (N) )

        memView = memoryview(memVals)
        self.assertEqual( memView.format , '' )
        self.assertEqual( memView.itemsize , 0 )
        # self.assertEqual( memView.shape , None )
        self.assertEqual( memView.ndim , 0 )

        # Dispatch & check values
        self.hw.dispatch()
        self.assertTrue( memVals.valid() )
        self.assertEqual( memVals.size(), N )
        self.assertEqual( len(memVals), N )
        self.assertEqual( xx , memVals.value() )

        # Create array using Python Buffer Protocol (zero copy mechanism)
        _ = memoryview(memVals) # memoryview function raises TypeError if buffer protocol not implemented
        memView = numpy.array(memVals, copy = False)
        with self.assertRaises(ValueError):
            memView[0] = 42
        self.assertEqual( len(memView), N )

        # Test indexing directly with ValVector_uint32 API, and with zero-copy array
        for i in range(N):
            self.assertEqual( xx[i], memVals.at(i) )
            self.assertEqual( xx[i], memVals[i] )
            self.assertEqual( xx[i], memVals[i-N] )
            self.assertEqual( xx[i], memView[i] )
            self.assertEqual( xx[i], memView[i-N] )

        # Test iteration directly with ValVector_uint32 API, and with zero-copy array
        self.assertEqual( xx, [x for x in memVals] )
        self.assertEqual( xx, [x for x in memView] )

        # Test slicing directly with ValVector_uint32 API, and with zero-copy array
        self.assertEqual( xx, memVals[:] )
        self.assertEqual( xx, list(memView[:]) )
        self.assertEqual( xx[1:2], memVals[1:2] )
        self.assertEqual( xx[1:2], list(memView[1:2]) )
        self.assertEqual( xx[1:100], memVals[1:100] )
        self.assertEqual( xx[1:100], list(memView[1:100]) )
        self.assertEqual( xx[::2], memVals[::2] )
        self.assertEqual( xx[::2], list(memView[::2]) )
        self.assertEqual( xx[::-1], memVals[::-1] )
        self.assertEqual( xx[::-1], list(memView[::-1]) )
        self.assertEqual( xx[-12:-2], memVals[-12:-2] )
        self.assertEqual( xx[-12:-2], list(memView[-12:-2]) )


    def test_block_write_read(self):
        for N in [1, 1024//4, 1024, 1024*1024//4]:
            self.exe_test_block_write_read(N)

    def exe_test_block_offset_write_read(self, N, offset):
        # Queue IPBus transactions
        xx = list_randuint32s(N)
        self.hw.getNode("MEM").writeBlockOffset( xx, offset )
        memVals = self.hw.getNode("MEM").readBlockOffset(N, offset)
        # Pre-dispatch tests
        self.assertEqual( memVals.valid(), False )
        self.assertEqual( memVals.size(), N )
        self.assertEqual( len(memVals), N )
        for i in range(N):
           self.assertRaises( uhal.NonValidatedMemory, memVals.at, (i) )
           self.assertRaises( uhal.NonValidatedMemory, memVals.__getitem__ , (i) )
        self.assertRaises( uhal.NonValidatedMemory, memVals.value )
        self.assertRaises( uhal.NonValidatedMemory, memVals.__iter__ )
        self.assertRaises( IndexError , memVals.__getitem__ , (-N-1) )
        self.assertRaises( IndexError , memVals.__getitem__ , (N) )
        # Dispatch & check values
        self.hw.dispatch()
        self.assertTrue( memVals.valid() )
        self.assertEqual( memVals.size(), N )
        self.assertEqual( len(memVals), N )
        self.assertEqual( xx , memVals.value() )
        # Test ValVector_uint32 indexing
        for i in range(N):
            self.assertEqual( xx[i], memVals[i] )
            self.assertEqual( xx[i], memVals.at(i) )
            self.assertEqual( xx[i], memVals[i-N] )
        # Test ValVector_uint32 iteration
        self.assertEqual( xx, [x for x in memVals] )

    def test_block_offset_write_read(self):
        for N in [1, 1024//4, 1024, 1024*1024//4]:
            for offset in [0, min(N,self.hw.getNode("MEM").getSize()-N), randint(0,self.hw.getNode("MEM").getSize()-N)]:
                self.exe_test_block_offset_write_read(N, offset)

    def test_block_bigger_than_size(self):
        # Block size too large (no offset)
        self.assertRaises( uhal.BulkTransferRequestedTooLarge, self.hw.getNode("SMALL_MEM").writeBlock, [0 for i in range(1024*1024)] )
        self.assertRaises( uhal.BulkTransferRequestedTooLarge, self.hw.getNode("SMALL_MEM").readBlock, (1024*1024) )
        # Offset too large
        self.assertRaises( uhal.exception, self.hw.getNode("SMALL_MEM").writeBlockOffset, *([0],1024*1024) )
        self.assertRaises( uhal.exception, self.hw.getNode("SMALL_MEM").readBlockOffset, *(0,1024*1024) )
        # Block size and offset both OK, combination too large
        self.assertRaises( uhal.exception, self.hw.getNode("SMALL_MEM").writeBlockOffset, *([0] * 128,129) )
        self.assertRaises( uhal.exception, self.hw.getNode("SMALL_MEM").readBlockOffset, *(128,129) )


class TestRawClient(unittest.TestCase):
    """
    TestCase sub-class checking writes & reads directly via ClientInterface.
    Analogous to C++ test_dummy_rawclient exe.
    """

    def setUp(self):
        manager = uhal.ConnectionManager(CONNECTIONS_FILE)
        self.hw = manager.getDevice(DEVICE_ID)

    def test_single_write_read_rawclient(self):
        client = self.hw.getClient()
        # Queue IPBus transactions
        x = 1
        addr = self.hw.getNode("REG").getAddress()
        client.write(addr, x)
        reg = client.read(addr)
        # Pre-dispatch tests
        self.assertEqual( reg.valid(), False)
        self.assertRaises( uhal.NonValidatedMemory, reg.value )
        self.assertRaises( uhal.NonValidatedMemory, int, (reg) )
#        try:
#            reg.value()
#        except uhal.NonValidatedMemory, e:
#            self.assertEqual( e.what, "uhal::exception::NonValidatedMemory")
#        try:
#            reg.value()
#        except uhal.exception, e:
#            self.assertEqual( e.what, "uhal::exception::NonValidatedMemory")
        self.assertRaises( uhal.exception, reg.value )
        # Dispatch & check value
        client.dispatch()
        self.assertTrue( reg.valid() )
        self.assertEqual( reg.value(), x )
        self.assertEqual( int(reg)   , x )

    def test_mem_write_read_rawclient(self):
        N = 1024*1024//4
        client = self.hw.getClient()
        # Queue IPBus transactions
        xx = list_randuint32s(N)
        addr = self.hw.getNode("MEM").getAddress()
        client.writeBlock(addr, xx)
        mem = client.readBlock(addr, N)
        # Pre-dispatch tests
        self.assertEqual(mem.valid(), False)
        self.assertEqual(mem.size(), N)
        self.assertRaises( uhal.NonValidatedMemory, mem.at, (0) )
        # Dispatch & check values
        client.dispatch()
        self.assertTrue( mem.valid() )
        self.assertEqual( mem.size(), N )
        for i in range(N):
            self.assertEqual( xx[i], mem.at(i) )

    def test_mem_rmw_bits(self):
        client = self.hw.getClient()

        addr = self.hw.getNode( "REG_UPPER_MASK" ).getAddress()
        x1 = randuint32()
        x2 = randuint32()
        x3 = randuint32()
        client.write(addr, x1)
        reg1 = client.rmw_bits(addr, x2, x3)
        reg2 = client.read(addr)

        client.dispatch()
        self.assertEqual( reg1.value(), reg2.value() )
        self.assertEqual( reg1.value(), ( (x1 & x2) | x3 ) )

    def test_mem_rmw_sum(self):
        N = 1024
        client = self.hw.getClient()

        total = 0
        xx = []
        for i in range(N):
            xx.append( randint(0,0xffffffff//1024) )
            total += xx[i]
        addr = self.hw.getNode( "SUBSYSTEM1.REG" ).getAddress()
        client.write( addr, xx[0] )

        for i in range(N):
            if i==0:
               client.write( addr, xx[0] )
            else:
               reg = client.rmw_sum( addr, xx[i] )
               client.dispatch()
        self.assertEqual(total, reg.value())


class TestConfigSpace(unittest.TestCase):
    """
    TestCase sub-class checking read behaviour for a single register in configuration space.
    Analogous to C++ code in test_config_space.cpp
    """

    def test_read_fullWord(self):
        manager = uhal.ConnectionManager( CONNECTIONS_FILE )
        client = manager.getDevice( DEVICE_ID ).getClient()

        for i in range(10):
            if "-1.3://" in client.uri():
                self.assertRaises(uhal.ValidationError, client.readConfigurationSpace, i)
            else:
                x = client.readConfigurationSpace(i)
                client.dispatch()
                self.assertEqual(x.value() & 0xFFFF, i)

    def test_read_masked(self):
        manager = uhal.ConnectionManager( CONNECTIONS_FILE )
        client = manager.getDevice( DEVICE_ID ).getClient()

        for i in range(10):
            if "-1.3://" in client.uri():
                self.assertRaises(uhal.ValidationError, client.readConfigurationSpace, i, 0xFF)
            else:
                x_lower = client.readConfigurationSpace(i, 0xFF)
                x_mid = client.readConfigurationSpace(i, 0xFFFF00)
                client.dispatch()
                self.assertEqual(x_lower.value(), i & 0xFF)
                self.assertEqual(x_mid.value(), (i & 0xFFFF00) >> 8)



class TestCheckPermissions(unittest.TestCase):
    """
    TestCase sub-class checking that node permissions defined in xml file are enforced correctly.
    Analogous to C++ test_dummy_check_permissions exe.
    """

    def test_check_permissions(self):
        manager = uhal.ConnectionManager( CONNECTIONS_FILE )
        hw = manager.getDevice( DEVICE_ID )

        self.assertRaises(uhal.WriteAccessDenied, hw.getNode("REG_READ_ONLY").write, (1) )
        self.assertRaises(uhal.WriteAccessDenied, hw.getNode("REG_MASKED_READ_ONLY" ).write, (1) )
        self.assertRaises(uhal.ReadAccessDenied, hw.getNode("REG_WRITE_ONLY").read )
        self.assertRaises(uhal.ReadAccessDenied, hw.getNode("REG_MASKED_WRITE_ONLY").read )
        self.assertRaises(uhal.WriteAccessDenied, hw.getNode("REG_MASKED_WRITE_ONLY").write, (1) )

        x = randuint32()
        hw.getNode("REG_WRITE_ONLY").write(x)
        hw.getNode("REG_READ_ONLY").read()
        hw.getNode("REG_MASKED_READ_ONLY").read()
        hw.dispatch()


class TestMasking(unittest.TestCase):
    """
    TestCase sub-class checking behaviour of writes & read of masked registers.
    Analogous to C++ test_dummy_masking exe.
    """

    def test_write_read_masked(self):
        manager = uhal.ConnectionManager( CONNECTIONS_FILE )
        hw = manager.getDevice( DEVICE_ID )
        # Check masks correct
        self.assertEqual( hw.getNode("REG_LOWER_MASK").getMask(), 0xffff )
        self.assertEqual( hw.getNode("REG_UPPER_MASK").getMask(), 0xffff0000 )

        # Check that exception raised when attempt write inconsistent with mask
        self.assertRaises( uhal.BitsSetWhichAreForbiddenByBitMask, hw.getNode("REG_LOWER_MASK").write, (0x1ffff) )
        self.assertRaises( uhal.BitsSetWhichAreForbiddenByBitMask, hw.getNode("REG_UPPER_MASK").write, (0xffff1000) )

        # Write reasonable values and check that values read back are consistent with mask
        x = randuint32()
        hw.getNode( "REG_LOWER_MASK" ).write( x & 0xffff )
        hw.getNode( "REG_UPPER_MASK" ).write( x >> 16 )
        reg_l = hw.getNode("REG_LOWER_MASK").read()
        reg_u = hw.getNode("REG_UPPER_MASK").read()
        ### Pre-dispatch tests
        self.assertEqual( reg_l.valid(), False )
        self.assertEqual( reg_u.valid(), False )
        self.assertRaises( uhal.exception, reg_l.value )
        self.assertRaises( uhal.exception, reg_u.value )
        ### Dispatch & check values
        hw.dispatch()
        self.assertTrue( reg_l.valid() and reg_u.valid() )
        self.assertTrue( reg_l.value()<=0xffff )
        self.assertTrue( reg_u.value()<=0xffff )
        self.assertEqual( reg_l.value(), (x & 0xffff) )
        self.assertEqual( reg_u.value(), (x >> 16) )


class TestMetaInfo(unittest.TestCase):
    """
    TestCase sub-class checking meta info (i.e. id, uri, node address, node permission, etc) of uhal classes.
    Analogous to C++ test_dummy_metainfo exe.
    """

    def setUp(self):
        self.manager = uhal.ConnectionManager(CONNECTIONS_FILE)

    def test_ConnectionManager_metainfo(self):
        """Run meta-info tests for ConnectionManager class."""
        devices_list = self.manager.getDevices()
        self.assertTrue( isinstance(devices_list, list) )
        self.assertEqual( len(devices_list), 7 )
        self.assertEqual( devices_list, sorted(['dummy.udp','dummy.udp2',
                                                'dummy.tcp','dummy.tcp2',
                                                'dummy.controlhub', 'dummy.controlhub2',
                                                'dummy.pcie2']) )
        self.assertEqual( len(self.manager.getDevices(".*udp.*")), 2 )
        # Check for throw when try to create ConnectionManager from non-existant file
        self.assertRaises( uhal.exception, uhal.ConnectionManager, ("some_bad_invalid_string") )
        self.assertRaises( uhal.exception, uhal.ConnectionManager, ("file://a_nonexistant_conn_file.xml") )

    def exe_test_Interfaces_metainfo(self, hwId, hwUri):
        """Execute meta-info tests for a particular hw device & associated client interface."""
        hw = self.manager.getDevice(hwId)
        self.assertEqual(hw.id(),      hwId)
        self.assertEqual(hw.__str__(), hwId)
        self.assertEqual(hw.uri(),     hwUri)
        self.exe_test_ClientInterface_metainfo(hw, hwId, hwUri)
        nodeId = {True: "A", False: "REG"}[ hwId.startswith("dummy.docu") ]
        self.exe_test_ClientInterface_metainfo( hw.getNode(nodeId), hwId, hwUri )

    def exe_test_ClientInterface_metainfo(self, hwOrNode, id, uri):
        """Execute meta-info test for a particular client interface instance."""
        client = hwOrNode.getClient()
        self.assertEqual(client.id(),      id )
        self.assertEqual(client.__str__(), id )
        self.assertEqual(client.uri(),     uri)
        for i in list_randuint32s(20):
            hwOrNode.getClient().setTimeoutPeriod(i)
            self.assertEqual(i, client.getTimeoutPeriod() )

    def test_HwInterface_metainfo(self):
        """Run meta-info tests for HwInterface and ClientInterface classes."""
        self.exe_test_Interfaces_metainfo("dummy.udp",             "ipbusudp-1.3://localhost:50001")
        self.exe_test_Interfaces_metainfo("dummy.tcp",             "ipbustcp-1.3://localhost:50002")
        self.exe_test_Interfaces_metainfo("dummy.controlhub",      "chtcp-1.3://localhost:10203?target=localhost:50001")
        #Test for throw when create HwInterface object from invalid address table string
#        self.assertRaises( uhal.exception, uhal.getDevice, "id0", "ipbusudp-1.3://localhost:12345", "file://a_non_existant_address_table.xml")
        self.assertRaises( uhal.exception, uhal.getDevice, "id1", "ipbusudp-1.3://localhost:23456", "invalid_format.xml")

    def exe_test_node_metainfo(self, deviceId, longId, address, ownId, permission, description, size, mask, mode, tags, pars):
        """Execute meta-info tests for a particular node."""
        hw = self.manager.getDevice( deviceId )
        self.assertEqual( hw.getNode(longId).getAddress()   ,  address )
        self.assertEqual( hw.getNode(longId).getId()        ,  ownId )
        self.assertEqual( hw.getNode(longId).__str__()      ,  ownId )
        self.assertEqual( hw.getNode(longId).getPermission(),  permission )
        self.assertEqual( hw.getNode(longId).getDescription(), description )
        self.assertEqual( hw.getNode(longId).getSize(),        size )
        self.assertEqual( hw.getNode(longId).getMask(),        mask )
        self.assertEqual( hw.getNode(longId).getMode(),        mode )
        self.assertEqual( hw.getNode(longId).getTags(),        tags )
        self.assertEqual( hw.getNode(longId).getParameters(),  pars )

    def exe_test_node_iteration(self, parent_node):
        """Checks that a node's descendents are iterated over in address order"""
        descendent_addr = 0x0
        for node in parent_node:
            self.assertTrue( node.getAddress() >= descendent_addr )
            descendent_addr = node.getAddress()

    def test_node_metainfo(self):
        """Run meta-info tests for Node class."""
        self.exe_test_node_metainfo(DEVICE_ID, "REG", ownId="REG", address=0x000001, permission=uhal.NodePermission.READWRITE, description="",
                                           size=1, mask=uhal.NOMASK(), mode=uhal.BlockReadWriteMode.SINGLE, tags="test", pars={})
        self.exe_test_node_metainfo(DEVICE_ID, "REG_READ_ONLY", ownId="REG_READ_ONLY", address=0x0002, permission=uhal.NodePermission.READ, description="",
                                           size=1, mask=uhal.NOMASK(), mode=uhal.BlockReadWriteMode.SINGLE, tags="", pars={})
        self.exe_test_node_metainfo(DEVICE_ID, "REG_WRITE_ONLY", ownId="REG_WRITE_ONLY", address=0x0003, permission=uhal.NodePermission.WRITE, description="",
                                           size=1, mask=uhal.NOMASK(), mode=uhal.BlockReadWriteMode.SINGLE, tags="", pars={})
        self.exe_test_node_metainfo(DEVICE_ID, "REG_MASKED_READ_ONLY", ownId="REG_MASKED_READ_ONLY", address=0x0005, permission=uhal.NodePermission.READ, description="",
                                           size=1, mask=0xffff0000, mode=uhal.BlockReadWriteMode.SINGLE, tags="", pars={} )
        self.exe_test_node_metainfo(DEVICE_ID, "REG_MASKED_WRITE_ONLY", ownId="REG_MASKED_WRITE_ONLY", address=0x0005, permission=uhal.NodePermission.WRITE, description="",
                                           size=1, mask=0x0000ffff, mode=uhal.BlockReadWriteMode.SINGLE, tags="", pars={} )
        self.exe_test_node_metainfo(DEVICE_ID, "REG_PARS", ownId="REG_PARS", address=0x0006, permission=uhal.NodePermission.READWRITE, description="",
                                            size=1, mask=uhal.NOMASK(), mode=uhal.BlockReadWriteMode.SINGLE, tags="", pars={'arg0':'val100','arg1':'val101'})
        self.exe_test_node_metainfo(DEVICE_ID, "MEM", ownId="MEM", address=0x100000, permission=uhal.NodePermission.READWRITE, description="A block memory in an example XML file",
                                           size=262144, mask=uhal.NOMASK(), mode=uhal.BlockReadWriteMode.INCREMENTAL, tags="", pars={})

        upperNode = self.manager.getDevice( DEVICE_ID ).getNode()
        self.assertEqual(len(upperNode.getNodes()), 55)
        self.assertEqual(len(upperNode.getNodes(r".*_MEM")), 2)

        self.exe_test_node_iteration( upperNode )
        self.exe_test_node_iteration( upperNode.getNode("REG") )
        self.exe_test_node_iteration( upperNode.getNode("SUBSYSTEM1") )



class TestPrintValMem( unittest.TestCase ):
    """TestCase sub-class checking __str__ method (used by 'print') for ValWord_uint32 and ValVector_uint32."""

    def test_ValWord(self):
        manager = uhal.ConnectionManager( CONNECTIONS_FILE )
        hw = manager.getDevice( DEVICE_ID )
        for i in range(5):
            x = randuint32()
            hw.getNode("REG").write(x)
            mem = hw.getNode("REG").read()
            self.assertRaises( uhal.NonValidatedMemory, mem.__str__ )
            hw.dispatch()
            self.assertEqual( mem.__str__() , str(x) )

    def exe_ValVector_test(self, xx):
        # Construct string from list, removing trailling "L"s in case working on 32-bit computer
        xx_string = "["
        for x in xx:
          xx_string += str(x).rstrip("L")+", "
        xx_string = xx_string.rstrip(", ")
        xx_string += "]"
        # Execute the test
        manager = uhal.ConnectionManager( CONNECTIONS_FILE )
        hw = manager.getDevice( DEVICE_ID )
        hw.getNode("MEM").writeBlock( xx )
        mem = hw.getNode("MEM").readBlock( len(xx) )
        self.assertRaises( uhal.NonValidatedMemory, mem.__str__ )
        hw.dispatch()
        self.assertEqual( mem.__str__() , xx_string )

    def test_ValVector(self):
        self.exe_ValVector_test( [0, 42, 56, 75] )
        self.exe_ValVector_test( list_randuint32s(1024*1024//4) )


class TestSigBusBlocked( unittest.TestCase ):
    """TestCase sub-class checking that 'import uhal' automatically blocks SIGBUS."""

    def test_SigBusGuard_creation(self):
        # If SIGBUS is not already blocked, then the SigBusGuard constructor (invoked by this funciton) will throw
        uhal.tests.create_sigbus_guard()


# Run the tests
if __name__ == '__main__':

    # Grabbing connections xml file from args, and leaving rest of args for unittest
    if len(sys.argv)<3:
        sys.stderr.write("ERROR: Connections file not specified\n")
        print(__doc__)
        sys.exit(2)

    try:
        opts, args = getopt.getopt([sys.argv.pop(1),sys.argv.pop(1)], "c:", ["connections-file="])
    except getopt.GetoptError as err:
        print(err)
        print(__doc__)
        sys.exit(2)

    CONNECTIONS_FILE = opts[0][1]
    ADDRESS_FILE = os.path.join(os.path.split(CONNECTIONS_FILE)[0], "dummy_address.xml")
    DEVICE_ID = "dummy.udp"

    # Check that '-v' is first remaining option if present - otherwise unittest module fails, with incomprehensible stack trace
    if ( sys.argv.count('-v')==1 and sys.argv[1]!='-v' ) or sys.argv.count('-v')>1 :
       print("Invalid usage of '-v' option - if used it should only come directly after connection file")
       print("E.g: To run only the unit tests under TestPrintValMem try ...")
       print("     python test_pycohal -c file::///dir/to/connections_file.xml  -v  TestPrintValMem")
       print(__doc__)
       sys.exit(2)

    # Now run tests ...
    print('Running tests using connections file "'+CONNECTIONS_FILE+'"\n')
    uhal.setLogLevelTo( uhal.LogLevel.FATAL )
    unittest.main()

