次のスクリプトの例は、Python で記述されています。Crosswork Traffic Analysis API を実行するには、python/get_traffic_example.py および python/cctraffic/cctraffic.py が必要です。get_traffic_example.py
を実行する前に、次のことを行う必要があります。
-
Python の依存関係をインストールします:pip3 install -r requirements.txt
-
API ベアラートークン(export TOKEN=<token string>
)を設定します
-
get_traffic_example.py ファイルを編集します。次の値を正しい値に置き換えます:api_version
、device_name
、start
および end
。
get_traffic_example.py
ファイルを編集した後、スクリプトを実行します:python3 get_traffic_example.py
スクリプトの例:get_traffic_example.py
# get_traffic_example.py
import os
import sys
from cctraffic import CCTrafficRestClient
host = "https://crosswork.cisco.com"
api_version = "beta"
device_name = "flow-automation-1"
# start and end may be supplied as:
# - ISO 8601 datetime string
# - unix timestamp in seconds since 1970
# - now
# - "<number> <unit> ago" where unit can be: "seconds", "minutes", "hours", "days".
start = "7 days ago"
end = "now"
if "TOKEN" in os.environ:
token = os.environ["TOKEN"]
else:
print("Bearer token not found. Set bearer token with: export TOKEN=<token string>")
sys.exit(-1)
client = CCTrafficRestClient(host, token, version=api_version, debug=False)
print(f"GetDevice for {device_name}")
device_info = client.GetDevice(device_name)
device_id = device_info["deviceId"]
print(f"Found device ID for {device_name}: {device_id}")
print(f"Traffic by interface for {device_name}")
traffic_for_my_device = client.GetInterfaceCounterTrafficTotals(start, end, device_id)
interface_name = traffic_for_my_device[0]["interfaces"][0]["interfaceName"]
print(f"Traffic by ASN for {device_name}/{interface_name}")
asn_traffic_for_my_device_interface = client.GetNetFlowTrafficTotalsByDevice(start, end, device_id, interface=interface_name, asn_breakdown=True)
print(f"Traffic by Prefix for {device_name}/{interface_name}")
prefix_traffic_for_my_device_interface = client.GetNetFlowTrafficTotalsByDevice(start, end, device_id, interface=interface_name, prefix_breakdown=True)
asn = asn_traffic_for_my_device_interface[0]["interfaces"][0]["asns"][0]["asn"]
device_prefix = prefix_traffic_for_my_device_interface[0]["interfaces"][0]["prefixes"][0]["prefix"]
print(f"Traffic by Prefix for {device_name}/{interface_name} ASN {asn}")
prefix_traffic_for_my_device_interface_asn = client.GetNetFlowTrafficTotalsByDevice(
start, end, device_id, interface=interface_name, asn=asn, asn_breakdown=True)
print(f"Traffic by Prefix")
prefix_traffic = client.GetNetFlowTrafficTotalsByPrefix(start, end)
prefix = prefix_traffic[0]["prefix"]
print(f"Traffic by Device for {prefix}")
device_traffic_for_prefix = client.GetNetFlowTrafficTotalsByPrefix(start, end, prefix)
print(f"Time series for {device_name}")
time_series_for_device = client.GetInterfaceCounterTrafficTimeSeries(start, end, device_id)
print(f"Time series for {device_name}/{interface_name}")
time_series_for_interface = client.GetInterfaceCounterTrafficTimeSeries(start, end, device_id, interface=interface_name)
print(f"Time series for {device_name}/{interface_name} {device_prefix}")
time_series_for_prefix = client.GetNetFlowTrafficTimeSeriesByDevice(start, end, device_id, interface=interface_name, prefix=device_prefix)
print(f"Time series for {device_name}/{interface_name} {asn}")
time_series_for_asn = client.GetNetFlowTrafficTimeSeriesByDevice(start, end, device_id, interface=interface_name, asn=asn)
スクリプトの例:cctraffic.py
# cctraffic.py
# Contains a very simple REST client to demonstrate how to call the Crosswork Cloud Traffic APIs
# Copyright (c) 2021 Cisco Systems, Inc. and others. All rights reserved.
import requests
from .util import UrlEncode
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class CCTrafficRestClient:
def __init__(self, host: str, token, version: str = "v1", debug: bool = False):
self.version = version
self.host = host
self.debug = debug
self.headers = {"content-type": "application/json", "Authorization": f"Bearer {token}"}
def DoApiCall(self, url):
if self.debug == True:
print(url)
response = requests.get(url, headers=self.headers, verify=False)
if self.debug == True:
print(response.status_code)
print(response.content)
return response
def GetDevice(self, device_name: str):
url = f"{self.host}/api/{self.version}/devices?name={device_name}"
response = self.DoApiCall(url)
if response.status_code != 200:
return ""
return response.json()["devices"][0]["deviceInfo"]
def GetInterfaceCounterTrafficTotals(self, start: str, end: str, device_id: str = ""):
start = UrlEncode(start)
end = UrlEncode(end)
if device_id == "":
url = f"{self.host}/api/{self.version}/devices/statistics/totals"
else:
url = f"{self.host}/api/{self.version}/devices/{device_id}/interfaces/statistics/totals"
url += f"?format=totals&timeStart={start}&timeEnd={end}"
response = self.DoApiCall(url)
if response.status_code != 200:
return ""
return response.json()["devices"]
def GetNetFlowTrafficTotalsByDevice(self, start: str, end: str, device_id: str, interface: str,
asn: int = 0, prefix: str = "", asn_breakdown: bool = False, prefix_breakdown: bool = False):
interface = UrlEncode(UrlEncode(interface))
prefix = UrlEncode(UrlEncode(prefix))
start = UrlEncode(start)
end = UrlEncode(end)
if asn != 0:
url = f"{self.host}/api/{self.version}/traffic/devices/{device_id}/interfaces/{interface}/asns/{asn}/prefixes"
elif asn_breakdown:
url = f"{self.host}/api/{self.version}/traffic/devices/{device_id}/interfaces/{interface}/asns"
elif prefix_breakdown:
url = f"{self.host}/api/{self.version}/traffic/devices/{device_id}/interfaces/{interface}/prefixes"
url += f"?format=totals&timeStart={start}&timeEnd={end}"
response = self.DoApiCall(url)
if response.status_code != 200:
return ""
return response.json()["devices"]
def GetNetFlowTrafficTotalsByPrefix(self, start: str, end: str, prefix: str = "", device_id: str = ""):
prefix = UrlEncode(UrlEncode(prefix))
start = UrlEncode(start)
end = UrlEncode(end)
if prefix == "":
url = f"{self.host}/api/{self.version}/traffic/prefixes"
elif device_id == "":
url = f"{self.host}/api/{self.version}/traffic/prefixes/{prefix}/devices"
else:
url = f"{self.host}/api/{self.version}/traffic/prefixes/{prefix}/devices/{device_id}/interfaces"
url += f"?format=totals&timeStart={start}&timeEnd={end}"
response = requests.get(url, headers=self.headers, verify=False)
if response.status_code != 200:
return ""
return response.json()["prefixes"]
def GetInterfaceCounterTrafficTimeSeries(self, start: str, end: str, device_id: str, interface: str = ""):
interface = UrlEncode(UrlEncode(interface))
start = UrlEncode(start)
end = UrlEncode(end)
if interface == "":
url = f"{self.host}/api/{self.version}/devices/{device_id}/statistics/totals"
else:
url = f"{self.host}/api/{self.version}/devices/{device_id}/interfaces/{interface}/statistics/totals"
url += f"?format=timeseries&timeStart={start}&timeEnd={end}"
response = requests.get(url, headers=self.headers, verify=False)
if response.status_code != 200:
return ""
return response.json()["devices"]
def GetNetFlowTrafficTimeSeriesByDevice(self, start: str, end: str, device_id: str, interface: str, asn: int = 0, prefix: str = ""):
interface = UrlEncode(UrlEncode(interface))
prefix = UrlEncode(UrlEncode(prefix))
start = UrlEncode(start)
end = UrlEncode(end)
if asn == 0 and prefix != "":
url = f"{self.host}/api/{self.version}/traffic/devices/{device_id}/interfaces/{interface}/prefixes/{prefix}"
elif asn != 0 and prefix == "":
url = f"{self.host}/api/{self.version}/traffic/devices/{device_id}/interfaces/{interface}/asns/{asn}"
else:
url = f"{self.host}/api/{self.version}/traffic/devices/{device_id}/interfaces/{interface}/asns/{asn}/prefixes/{prefix}"
url += f"?format=timeseries&timeStart={start}&timeEnd={end}"
response = self.DoApiCall(url)
if response.status_code != 200:
return ""
return response.json()["devices"]