Einleitung
In diesem Dokument wird beschrieben, wie die Event Stream-Funktion für Advanced Malware Protection (AMP) für Endgeräte konfiguriert und verwendet wird.
Voraussetzungen
Anforderungen
Cisco empfiehlt, dass Sie über Kenntnisse in den folgenden Themen verfügen:
- AMP für Endgeräte
- Grundkenntnisse der Python-Programmierung
Verwendete Komponenten
Die Informationen in diesem Dokument basieren auf Python 3.7 mit den externen Bibliotheken pika (Version 1.1.0) und Requests (Version 2.2.0).
Die Informationen in diesem Dokument beziehen sich auf Geräte in einer speziell eingerichteten Testumgebung. Alle Geräte, die in diesem Dokument benutzt wurden, begannen mit einer gelöschten (Nichterfüllungs) Konfiguration. Wenn Ihr Netzwerk in Betrieb ist, stellen Sie sicher, dass Sie die möglichen Auswirkungen aller Befehle kennen.
Konfigurieren
Netzwerkdiagramm
Dieses Bild zeigt ein Beispiel für die Ereignisstream-Sequenzierung:

Konfigurationen
Erstellen von API-Anmeldeinformationen
- Navigieren Sie zu Ihrem AMP für Endgeräte-Portal, und melden Sie sich an.
- Wählen Sie unter Accounts (Konten) den Eintrag API Credentials (API-Anmeldedaten) aus.
- Klicken Sie auf Neue API-Anmeldeinformationen
- Geben Sie im Feld Anwendungsname einen Wert ein.
- Wählen Sie Lesen und Schreiben für Bereich
- Klicken Sie auf Erstellen
- Speichern Sie diese Anmeldeinformationen in einem Passwort-Manager oder einer verschlüsselten Datei.
Ereignisstream erstellen
- Öffnen Sie eine Python-Shell, und importieren Sie die Bibliotheken json, ssl, pika und request.
import json
import pika
import requests
import ssl
- Speichern Sie die Werte für url, client_id und api_key. Ihre URL kann variieren, wenn Sie die nordamerikanische Cloud nicht verwenden. Ihre client_id und api_key sind einzigartig in Ihrer Umgebung.
url = "https://api.amp.cisco.com/v1/event_streams"
client_id = "d16aff14860af496e848"
api_key = "d01ed435-b00d-4a4d-a299-1806ac117e72"
- Erstellen Sie das Datenobjekt, das an die Anforderung übergeben werden soll. Dies muss einen Namen enthalten und kann event_type und group_guid enthalten, um die im Stream enthaltenen Ereignisse und Gruppen einzuschränken. Wenn group_guid oder event_type nicht übergeben wird, enthält der Ereignisstream alle Gruppen und Ereignistypen.
data = {
"name": "Event Stream for ACME Inc",
"group_guid": ["5cdf70dd-1b14-46a0-be90-e08da14172d8"],
"event_type": [1090519054]
}
- Führen Sie den POST-Anforderungsaufruf aus, und speichern Sie den Wert in einer Variablen.
r = requests.post(
url = url,
data = data,
auth = (client_id, api_key)
)
- Drucken Sie den Statuscode aus. Bestätigen Sie, dass der Code 201 lautet.
print(r.status_code)
- Laden Sie den Inhalt der Antwort in ein json-Objekt, und speichern Sie dieses Objekt in einer Variablen.
j = json.loads(r.content)
- Überprüfen Sie den Inhalt der Antwortdaten.
for k, v in j.items():
print(f"{k}: {v}")
- Die AMQP-Daten (Advanced Message Queuing Protocol) befinden sich in der Antwort. Extrahieren Sie die Daten in die entsprechenden Variablen.
user_name = j["data"]["amqp_credentials"]["user_name"]
queue_name = j["data"]["amqp_credentials"]["queue_name"]
password = j["data"]["amqp_credentials"]["password"]
host = j["data"]["amqp_credentials"]["host"]
port = j["data"]["amqp_credentials"]["port"]
proto = j["data"]["amqp_credentials"]["proto"]
- Definieren Sie eine Rückruffunktion mit diesen Parametern. In dieser Konfiguration wird der Text des Ereignisses auf den Bildschirm gedruckt. Sie können den Inhalt dieser Funktion jedoch an Ihre Ziele anpassen.
def callback(channel, method, properties, body):
print(body)
- Bereiten Sie die AMQP-URL anhand der von Ihnen erstellten Variablen vor.
amqp_url = f"amqps://{user_name}:{password}@{host}:{port}"
- SSL-Kontext vorbereiten
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
amqp_ssl = pika.SSLOptions(context)
- Bereiten Sie den AMQP-Stream mit den pika-Bibliotheksmethoden vor.
params = pika.URLParameters(amqp_url)
params.ssl_options = amqp_ssl
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.basic_consume(
queue_name,
callback,
auto_ack = False
)
- Initiieren Sie den Stream.
channel.start_consuming()
- Der Stream ist jetzt live und wartet auf Veranstaltungen.
Überprüfung
Auslösen eines Ereignisses an einem Endpunkt in Ihrer Umgebung. Starten Sie z. B. einen Flash-Scan. Beachten Sie, dass der Stream die Ereignisdaten auf dem Bildschirm ausgibt.
Drücken Sie Strg+C (Windows) oder Command-C (Mac), um den Stream zu unterbrechen.
Fehlerbehebung
Statuscodes
- Ein Statuscode von 401 weist auf ein Autorisierungsproblem hin. Überprüfen Sie Ihre client_id und api_key, oder generieren Sie neue Schlüssel.
- Ein Statuscode von 400 weist auf ein Problem mit einer fehlerhaften Anforderung hin. Stellen Sie sicher, dass keine Ereignisüberwachung mit diesem Namen erstellt wurde bzw. dass nicht mehr als fünf Ereignisüberwachungen erstellt wurden.
- Eine weitere mögliche Abhilfe für den Statuscode 400 wäre die Hinzufügung der folgenden Variablen:
headers = {
'content-type': 'application/json'
}
und aktualisieren Sie Ihre Postanforderung, um die Kopfzeile Deklaration widerzuspiegeln:
r = requests.post(
url = url,
headers = headers,
data = data,
auth = (client_id, api_key)
)