onePK (1.1.0) Python basic tutorial

Document created by chomjakrichard on Mar 31, 2014Last modified by chomjakrichard on Apr 1, 2014
Version 4Show Document
  • View in full screen mode

Hi a wrote small tutorial for onePK python beginners.

python script.py -a 120.120.120.2 -u cisco1 -p cisco1 -R /home/cisco/ca.pem -i GigabitEthernet0/0

 

#!/usr/bin/env python

#Author: Richard Chomjak
#About: THIS IS SAMPLE TUTORIAL OF ONEPK!
#FEEL FREE TO USE/MODIFY...
#
#TODO: Add policy map + event
#TODO: create some conf file

import onep.element as onepElement
import onep.core.util.OnepConstants as onepConstants

import onep.core.exception.OnepException as onepException


import onep.policy.L3Acl as onepL3ACL
import onep.policy.L3Ace as onepL3ACE


import onep.policyservice.BulkService as onepBulkService
import onep.policyservice.PolicyQuery as onepPolicyQuery
import onep.policyservice.PolicyCapabilities as onepPolicyCapabilities
import onep.policyservice.Match as onepMatch

import onep.vty.VtyService as onepVTYService

import signal
import sys

import argparse

parser = argparse.ArgumentParser(description="Tiny examle of onePK.")
parser.add_argument("-i", "--interface", help="Interface name for apply ACL ->"
                    + " full name for example =GigabitEthernet0/0.", type=str, required=False,
                    default="GigabitEthernet0/0")
parser.add_argument("-a", "--address", help="Address of network element",
                    type=str, required=True)
parser.add_argument("-R", "--cacert", help="CA cert path",
                    type=str, required=True)
parser.add_argument("-u", "--user", help="Name of username for connection purpose.",
                    type=str, required=True)
parser.add_argument("-p", "--passw", help="Password of username for connection purpose.",
                    type=str, required=True)
parser.add_argument("-n", "--appname", help="Name of aplication (show onep session).",
                    type=str, required=False, default="OnepAPP")


def sigint_handler(signal, frame):
    print "EXITING..."
    network_element.disconnect()
    sys.exit(0)


def vty_handle(network_element, cmd="show proto\r\n"):
    #Very basic function of onepk
    #Look into documentation for parser state etc..
    #For propery working vty service set
    #YOU MUST set these commands:
    #en
    #conf t
    #onep
    #service set vty
    vty = None
    response = None
    try:
        vty = onepVTYService.VtyService(network_element)
        vty.open()
        response = vty.write(cmd)
        vty.close()
        return response
    except Exception, e:
        sys.stderr.write("Something is wrong :-(. Probably you have not set vty SS.")
        sys.stderr.write(e)
        raise


def interface_obj(network_element, interface_name="GigabitEthernet0/0"):
    named_interface = None
    try:
        named_interface = network_element.get_interface_by_name(interface_name)
       #return object interface default GigabitEthernet0/0
        return named_interface
    except Exception, e:
        sys.stderr.write("Something is wrong :-(.")
        sys.stderr.write(e)
        raise


def create_L3acl_ace(id_ace=1, permit=True, src_prefix=16,
                     src=None, dst_prefix=24, dst=None, dscp="000048"):
    #DSCP 000048 = CS6 (in decimal format)
    #If  permit = False = deny in ACE
    #for example src='1.1.1.0' dst=2.2.0.0
    #Create element in  ACL
    #1 permit any any
    ace = None
    try:
        ace = onepL3ACE(id_ace, permit)
        if src_prefix is None or src is None or dst_prefix is None or dst is None:
            ace.set_src_prefix_any()
            ace.set_dst_prefix_any()
            return ace

        ace.src_prefix = str(src)
        ace.src_prefix_len = int(src_prefix)
        ace.dst_prefix = str(dst)
        ace.dst_prefix_len = int(dst_prefix)
        ace.set_dscp = int(dscp)
        return ace

    except Exception, e:
        sys.stderr.write("FAILED WHILE CREATING ACE.")
        sys.stderr.write(e)


def create_L3acl(network_element, storage_type="TRANSIENT", AF="AF_INET"):
    #If TRANSIENT ACL, that exists only if application is connected: show onep session
    #PERSISTENT WILL BE WRITE INTO RUNNING CONFIG
    acl = None
    try:
        acl = onepL3ACL(network_element,
                        getattr(onepConstants.OnepAddressFamilyType, "ONEP_" + str(AF)),
                        getattr(onepL3ACL.OnepLifetime, "ONEP_" + str(storage_type)))
        return acl
    except Exception, e:
        sys.stderr.write("error: Probably you set bad storage type or AF(Address Family).")
        sys.stderr.write(e)


def add_ace_2_ACL(acl=None, ace=None):
    if acl is None or ace is None:
        sys.stderr.write("ACL or ACE is None.")
        return 1
    try:
        acl.add_ace(ace)
    except Exception, e:
        sys.stderr.write("Something is wrong :-(")
        sys.stderr.write(e)


def acl_apply(acl=None, direction="BOTH", interface=None):
    if interface is None or acl is None:
        sys.stderr.write("error: interface or acl is None!")
        return 1
        #interface
    try:
        acl.apply_to_interface(interface,
                               getattr(acl.Direction, "ONEP_DIRECTION_" + str(direction)))
    except Exception, e:
        sys.stderr.write("Something is wrong :-(. mb direction")
        sys.stderr.write(e)


def list_of_policy_cap(network_element):
    #I was unable find enum for capabilities (from int to str)
    list_cap = []
    pcap_net_ele = onepPolicyCapabilities.get_capabilities(network_element)
    for ele_cap in pcap_net_ele:

        #In vIOS exists some capabilities name for example 'IOS DATAPATH'
        list_cap.append(str(ele_cap.name).strip(' IOS').replace(' ', "_").upper())
    return list_cap


def create_bulk_service(network_element):
    #bulkservice is important when we want create policy map or class map
    try:
        return onepBulkService(network_element)
    except Exception, e:
        sys.stderr.write("Something is wrong :-(.")
        sys.stderr.write(e)
        raise


def create_class_map(network_element, blk_src=None, policy_map_type="QOS_INGRESS",
                     class_map_name="classmap_name"):
    if blk_src is None:
        sys.stderr.write("error: BULK SERVICE IS NONE!!!")
        return 1

    #If you want use policy map type QOS_INGRESS
    #you MUST use class map with same type. In this case QOS_INGRESS

    try:
        cl_map = (blk_src.create_class
                  (getattr(onepPolicyQuery.PolicyCapabilitiesType,
                   str(policy_map_type).upper()), network_element, str(class_map_name)))
    #Example of use onep exception
    except onepException.OnepNotSupportedException, e:
        sys.stderr.write("error: Trying to use unsupported policy type: "
                         + str(policy_map_type) + "\n")
        sys.stderr.write("List of possible policy cap." + str(list_of_policy_cap(network_element))
                         + "\n")
        sys.stderr.write(e)
        raise

    except Exception, e:
        sys.stderr.write("error: probably you set wrong value of policy_map_type: "
                         + str(policy_map_type) + "\n")
        sys.stderr.write("List of possible policy cap." + str(list_of_policy_cap(network_element))
                         + "\n")

        sys.stderr.write(e)
        raise
    '''
    #Lifetime I will rewrite but I am currently busy
    try:
        cl_map.storage_type = (getattr(onepPolicy.L3Acl.OnepLifetime, "ONEP_"
                               + str(storage_type).upper()))
    except AttributeError, e:
        sys.stderr.write("error: Bad value: " + str(storage_type) + "\n")
        sys.stderr.write(e)
        raise
    '''
    return cl_map


def add_acl_2_class_map(class_map=None, acl=None):
    if class_map is None or acl is None:
        sys.stderr.write("error: CLASS map or ACL is None!")
        return 1
    try:
        #add ACL to class-map
        class_map.add_match(onepMatch.ACL(acl))
        #send to device
    except Exception, e:
        sys.stderr.write("Something is wrong :-(.")
        sys.stderr.write(e)
        raise


def bulk_src_submit_class(blk_src=None, class_map=None):
    if blk_src is None or class_map is None:
        sys.stderr.write("error: BULK SERVICE or CLASS map is None!")
        return 1
    try:
        blk_src.submit_class_map(class_map)
    except Exception, e:
        sys.stderr.write("Something is wrong :-(.")
        sys.stderr.write(e)
        raise


def connect_2_element(address_element, app_name, username, passw, ca_cert):
    network_element = None
    session_config = None
    try:
        network_element = onepElement.NetworkElement(str(address_element), str(app_name))
        session_config = (onepElement.SessionConfig(
                          onepElement.SessionConfig.SessionTransportMode.TLS))
        session_config.ca_certs = str(ca_cert)
        session_handle = network_element.connect(str(username), str(passw), session_config)
        return network_element, session_handle
    except Exception, e:
        sys.stderr.write("Something is wrong :-(.")
        sys.stderr.write(e)
        raise

if __name__ == '__main__':

    parsed = parser.parse_args()
    app_2_interface_name = parsed.interface
    signal.signal(signal.SIGINT, sigint_handler)
    network_element, session_handle = (connect_2_element(
                                       parsed.address, parsed.appname,
                                       parsed.user, parsed.passw, parsed.cacert))

    print vty_handle(network_element, "show onep status\r\n")
    L3_ACE = create_L3acl_ace(id_ace=42, permit=True, src_prefix=24,
                              src="5.5.5.0", dst_prefix=24, dst="123.123.123.0")
    L3_ACL = create_L3acl(network_element)
    add_ace_2_ACL(L3_ACL, L3_ACE)

    interface = interface_obj(network_element, app_2_interface_name)
    print vty_handle(network_element, "show ip access-lists dynamic\r\n")
    acl_apply(acl=L3_ACL, interface=interface)
    blk_src = create_bulk_service(network_element)
    class_map = create_class_map(network_element, blk_src=blk_src, policy_map_type="QOS_INGRESS")
    add_acl_2_class_map(class_map=class_map, acl=L3_ACL)
    bulk_src_submit_class(blk_src, class_map)
    print vty_handle(network_element, "show class-map\r\n")

    network_element.disconnect()

Attachments

    Outcomes