{
  "nbformat": 4,
  "nbformat_minor": 5,
  "metadata": {
    "kernelspec": {
      "display_name": "Python 3",
      "language": "python",
      "name": "python3"
    },
    "language_info": {
      "name": "python",
      "version": "3.13.0"
    },
    "blog_metadata": {
      "topic": "How Fabric Eventstream’s New Connectors Change Real-Time Ingestion Strategy",
      "slug": "how-fabric-eventstream-s-new-connectors-change-real-time-ing",
      "generated_by": "LinkedIn Post Generator + Azure OpenAI",
      "generated_at": "2026-07-02T12:40:54.619Z"
    }
  },
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# How Fabric Eventstream’s New Connectors Change Real-Time Ingestion Strategy\n",
        "\n",
        "Microsoft Fabric Eventstream changes where real-time ingestion can begin. Instead of assuming streaming integration must be solved outside Fabric first, teams can now validate whether Eventstream should serve as the native ingress, lightweight transformation, and routing layer for Fabric-bound analytics.\n",
        "\n",
        "This notebook turns that architectural argument into hands-on validation steps using Python. It focuses on Kafka-compatible smoke testing, delivery verification, burst simulation, and simple decision aids for evaluating when connector-first ingestion is enough."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {},
      "source": [
        "%pip install kafka-python pandas"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "metadata": {},
      "source": [
        "import os\n",
        "import json\n",
        "import time\n",
        "import random\n",
        "import socket\n",
        "from datetime import datetime, timezone\n",
        "\n",
        "import pandas as pd\n",
        "from kafka import KafkaProducer"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Architecture shift: from external glue to native ingress\n",
        "\n",
        "The main strategic change is not just that Fabric has more connectors. It is that Eventstream can now become the default starting point for many real-time analytics workloads, reducing the need for custom pollers, protocol bridges, and ad hoc routing layers outside Fabric.\n",
        "\n",
        "The diagram below recreates the before-and-after pattern from the post in a notebook-friendly form."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {},
      "source": [
        "before_after = {\n",
        "    \"before\": [\n",
        "        \"Source-specific scripts\",\n",
        "        \"Polling jobs\",\n",
        "        \"Custom parsing\",\n",
        "        \"Ad hoc routing\",\n",
        "        \"Analytics stores\"\n",
        "    ],\n",
        "    \"after\": [\n",
        "        \"Eventstream connectors\",\n",
        "        \"Managed ingestion\",\n",
        "        \"Central transforms\",\n",
        "        \"Multiple Fabric destinations\"\n",
        "    ]\n",
        "}\n",
        "\n",
        "for phase, steps in before_after.items():\n",
        "    print(f\"\\n{phase.upper()} PATTERN\")\n",
        "    for i, step in enumerate(steps, start=1):\n",
        "        print(f\"  {i}. {step}\")"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Decision tree framing for Fabric ingestion choices\n",
        "\n",
        "The blog argues for a cleaner service boundary:\n",
        "- use Eventstream for event streaming,\n",
        "- mirroring for replication,\n",
        "- pipelines for scheduled movement,\n",
        "- shortcuts for virtualization.\n",
        "\n",
        "This small Python table makes that decision logic explicit so teams can discuss architecture choices consistently."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {},
      "source": [
        "decision_tree = pd.DataFrame([\n",
        "    {\"workload_type\": \"event streaming\", \"recommended_service\": \"Eventstream\", \"why\": \"native real-time ingestion, shaping, and routing\"},\n",
        "    {\"workload_type\": \"replication\", \"recommended_service\": \"Mirroring\", \"why\": \"near-real-time replication from supported operational stores\"},\n",
        "    {\"workload_type\": \"scheduled movement\", \"recommended_service\": \"Data Factory pipelines\", \"why\": \"batch orchestration and movement\"},\n",
        "    {\"workload_type\": \"access without copying\", \"recommended_service\": \"OneLake shortcuts\", \"why\": \"virtualization instead of movement\"},\n",
        "])\n",
        "\n",
        "decision_tree"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Required environment variables\n",
        "\n",
        "The following environment variables are recommended for the Kafka-compatible smoke tests:\n",
        "\n",
        "- `FABRIC_KAFKA_BOOTSTRAP`\n",
        "- `FABRIC_KAFKA_USERNAME`\n",
        "- `FABRIC_KAFKA_PASSWORD`\n",
        "- `FABRIC_TOPIC_TELEMETRY`\n",
        "- `FABRIC_TOPIC_ORDERS`\n",
        "\n",
        "Optional:\n",
        "- `FABRIC_KAFKA_PORT` (defaults to `9093`)\n",
        "\n",
        "These values should match your Fabric Eventstream Kafka endpoint and authentication configuration."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Validate local configuration before testing\n",
        "\n",
        "Before attempting to publish events, verify that the required settings exist. This Python version mirrors the intent of the PowerShell prerequisite checks from the post."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {},
      "source": [
        "required = [\n",
        "    \"FABRIC_KAFKA_BOOTSTRAP\",\n",
        "    \"FABRIC_KAFKA_USERNAME\",\n",
        "    \"FABRIC_KAFKA_PASSWORD\",\n",
        "    \"FABRIC_TOPIC_TELEMETRY\",\n",
        "    \"FABRIC_TOPIC_ORDERS\",\n",
        "]\n",
        "\n",
        "missing = [name for name in required if not os.getenv(name)]\n",
        "\n",
        "if missing:\n",
        "    print(\"Missing environment variables:\")\n",
        "    for name in missing:\n",
        "        print(f\" - {name}\")\n",
        "else:\n",
        "    print(\"All required Eventstream test settings are present\")"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Validate DNS and TCP reachability for the Kafka-compatible endpoint\n",
        "\n",
        "If DNS resolution or TCP connectivity to port 9093 fails, the architecture discussion is premature. This Python cell performs a lightweight reachability check similar to the PowerShell example in the post."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {},
      "source": [
        "bootstrap = os.getenv(\"FABRIC_KAFKA_BOOTSTRAP\", \"your-eventstream-kafka-endpoint:9093\")\n",
        "host, _, port_text = bootstrap.partition(\":\")\n",
        "port = int(port_text) if port_text else int(os.getenv(\"FABRIC_KAFKA_PORT\", \"9093\"))\n",
        "\n",
        "print(f\"Checking DNS resolution for {host}...\")\n",
        "try:\n",
        "    resolved_ip = socket.gethostbyname(host)\n",
        "    print(f\"DNS OK -> {resolved_ip}\")\n",
        "except Exception as e:\n",
        "    print(f\"DNS resolution failed: {e}\")\n",
        "\n",
        "print(f\"Checking TCP connectivity on port {port}...\")\n",
        "try:\n",
        "    with socket.create_connection((host, port), timeout=5):\n",
        "        print(\"TCP connectivity OK\")\n",
        "except Exception as e:\n",
        "    print(f\"TCP connectivity failed: {e}\")"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Create a reusable Kafka producer helper\n",
        "\n",
        "To keep later examples self-contained but consistent, this helper builds a `KafkaProducer` using environment variables. It uses JSON serialization for values and optionally supports keyed messages."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {},
      "source": [
        "def build_producer(with_keys=False):\n",
        "    config = {\n",
        "        \"bootstrap_servers\": os.getenv(\"FABRIC_KAFKA_BOOTSTRAP\", \"your-eventstream-kafka-endpoint:9093\"),\n",
        "        \"security_protocol\": \"SASL_SSL\",\n",
        "        \"sasl_mechanism\": \"PLAIN\",\n",
        "        \"sasl_plain_username\": os.getenv(\"FABRIC_KAFKA_USERNAME\", \"$ConnectionString\"),\n",
        "        \"sasl_plain_password\": os.getenv(\"FABRIC_KAFKA_PASSWORD\", \"Endpoint=sb://...;SharedAccessKeyName=...;SharedAccessKey=...\"),\n",
        "        \"value_serializer\": lambda v: json.dumps(v).encode(\"utf-8\"),\n",
        "    }\n",
        "    if with_keys:\n",
        "        config[\"key_serializer\"] = lambda k: k.encode(\"utf-8\")\n",
        "    return KafkaProducer(**config)\n",
        "\n",
        "print(\"Producer helper ready\")"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Smoke test: send JSON events to Eventstream quickly\n",
        "\n",
        "This is the first practical validation step from the blog. The goal is not production readiness; it is to prove that a few well-formed events can reach Fabric through a Kafka-compatible path with minimal setup."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {},
      "source": [
        "producer = build_producer()\n",
        "topic = os.getenv(\"FABRIC_TOPIC_TELEMETRY\", \"telemetry\")\n",
        "\n",
        "try:\n",
        "    for i in range(3):\n",
        "        event = {\n",
        "            \"deviceId\": f\"sensor-{i}\",\n",
        "            \"temperature\": 20 + i,\n",
        "            \"eventTime\": datetime.now(timezone.utc).isoformat(),\n",
        "        }\n",
        "        producer.send(topic, event)\n",
        "        print(f\"queued -> {event}\")\n",
        "        time.sleep(1)\n",
        "\n",
        "    producer.flush()\n",
        "    print(\"Smoke test complete\")\n",
        "finally:\n",
        "    producer.close()"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Add callbacks and keys to validate publish success and partition behavior\n",
        "\n",
        "Once the basic path works, the next step is to verify delivery metadata and keyed publishing. This helps teams confirm that messages are accepted and observe partition assignment behavior."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {},
      "source": [
        "def on_send_success(record_metadata):\n",
        "    print(\n",
        "        f\"delivered topic={record_metadata.topic} \"\n",
        "        f\"partition={record_metadata.partition} offset={record_metadata.offset}\"\n",
        "    )\n",
        "\n",
        "def on_send_error(excp):\n",
        "    print(f\"publish_failed error={excp}\")\n",
        "\n",
        "producer = build_producer(with_keys=True)\n",
        "topic = os.getenv(\"FABRIC_TOPIC_TELEMETRY\", \"telemetry\")\n",
        "\n",
        "try:\n",
        "    future = producer.send(topic, key=\"sensor-42\", value={\"status\": \"ok\", \"rpm\": 1800})\n",
        "    future.add_callback(on_send_success)\n",
        "    future.add_errback(on_send_error)\n",
        "    producer.flush()\n",
        "finally:\n",
        "    producer.close()"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Simulate a bursty workload\n",
        "\n",
        "The blog recommends testing burst behavior to challenge older polling instincts. This example sends a short burst of order events with randomized timing and values so you can observe whether Eventstream can absorb the workload without needing a separate ingress layer."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {},
      "source": [
        "producer = build_producer()\n",
        "topic = os.getenv(\"FABRIC_TOPIC_ORDERS\", \"orders\")\n",
        "\n",
        "sent = []\n",
        "try:\n",
        "    for _ in range(10):\n",
        "        payload = {\n",
        "            \"orderId\": random.randint(1000, 9999),\n",
        "            \"amount\": round(random.uniform(10, 500), 2),\n",
        "            \"eventTime\": datetime.now(timezone.utc).isoformat(),\n",
        "        }\n",
        "        producer.send(topic, payload)\n",
        "        sent.append(payload)\n",
        "        time.sleep(random.uniform(0.05, 0.3))\n",
        "\n",
        "    producer.flush()\n",
        "    print(\"Burst test complete\")\n",
        "    pd.DataFrame(sent)\n",
        "finally:\n",
        "    producer.close()"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Connector-first ingestion flow\n",
        "\n",
        "This notebook-friendly representation captures the core target architecture from the post: operational sources feed Eventstream connectors, which handle real-time transform and routing into Fabric destinations and downstream consumers."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {},
      "source": [
        "flow = {\n",
        "    \"Operational Sources\": [\"Fabric Eventstream Connectors\"],\n",
        "    \"Fabric Eventstream Connectors\": [\"Real-time Transform / Route\"],\n",
        "    \"Real-time Transform / Route\": [\"Lakehouse / KQL DB / Activator\", \"Downstream Consumers\"],\n",
        "    \"Legacy Pattern: Custom Pollers + Glue Code\": [\"replaced by -> Fabric Eventstream Connectors\"],\n",
        "    \"Kafka-compatible Test Producer\": [\"Fabric Eventstream Connectors\"],\n",
        "}\n",
        "\n",
        "for source, targets in flow.items():\n",
        "    for target in targets:\n",
        "        print(f\"{source} -> {target}\")"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Sequence view of ingestion, shaping, and persistence\n",
        "\n",
        "The post also emphasizes that new connectors reduce custom ingestion code by standardizing the path from source system to Eventstream and then into transform and destination stages. This cell renders that sequence as structured steps."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {},
      "source": [
        "sequence_steps = [\n",
        "    (\"Source System\", \"Eventstream Connector\", \"Emit operational events\"),\n",
        "    (\"Eventstream Connector\", \"Fabric Eventstream\", \"Standardized ingestion\"),\n",
        "    (\"Fabric Eventstream\", \"Transform/Route\", \"Filter / enrich / branch\"),\n",
        "    (\"Transform/Route\", \"Destination\", \"Persist to analytics target\"),\n",
        "]\n",
        "\n",
        "for sender, receiver, action in sequence_steps:\n",
        "    print(f\"{sender} -> {receiver}: {action}\")\n",
        "\n",
        "print(\"Note: New connectors reduce custom ingestion code\")"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Rollout readiness report\n",
        "\n",
        "A connector-first strategy still requires operational discipline. This simple readiness report mirrors the blog’s rollout checklist idea and can be adapted for team reviews."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {},
      "source": [
        "checks = {\n",
        "    \"DnsResolved\": True,\n",
        "    \"Tcp9093Open\": True,\n",
        "    \"SecretsStored\": bool(os.getenv(\"FABRIC_KAFKA_PASSWORD\")),\n",
        "    \"TopicPlanned\": bool(os.getenv(\"FABRIC_TOPIC_TELEMETRY\")) and bool(os.getenv(\"FABRIC_TOPIC_ORDERS\")),\n",
        "    \"MonitoringDefined\": False,\n",
        "}\n",
        "\n",
        "report = pd.DataFrame([\n",
        "    {\"Check\": key, \"Status\": \"Pass\" if value else \"Action Required\"}\n",
        "    for key, value in checks.items()\n",
        "])\n",
        "\n",
        "report"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Evaluate when Eventstream is enough\n",
        "\n",
        "The post argues that Eventstream should be the default first design option for Fabric-native real-time analytics, but not the answer to every integration problem. This scoring aid helps teams reason about fit based on workload characteristics."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {},
      "source": [
        "criteria = pd.DataFrame([\n",
        "    {\"criterion\": \"throughput is moderate rather than extreme\", \"supports_eventstream\": True},\n",
        "    {\"criterion\": \"event contracts are reasonably stable\", \"supports_eventstream\": True},\n",
        "    {\"criterion\": \"primary destination is Fabric\", \"supports_eventstream\": True},\n",
        "    {\"criterion\": \"goal is analytics, alerting, or activation\", \"supports_eventstream\": True},\n",
        "    {\"criterion\": \"team wants fast time to value with managed ingestion\", \"supports_eventstream\": True},\n",
        "    {\"criterion\": \"strict replay guarantees are central\", \"supports_eventstream\": False},\n",
        "    {\"criterion\": \"high-scale decoupling across many consumers is required\", \"supports_eventstream\": False},\n",
        "    {\"criterion\": \"protocol mediation is the main challenge\", \"supports_eventstream\": False},\n",
        "    {\"criterion\": \"workload is enterprise integration first and analytics second\", \"supports_eventstream\": False},\n",
        "])\n",
        "\n",
        "criteria"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Tradeoff checklist for architects\n",
        "\n",
        "Native ingestion simplifies architecture, but it does not remove governance, replay planning, schema discipline, or transformation boundary decisions. Use this checklist to capture the first area most likely to break in your environment."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {},
      "source": [
        "tradeoffs = pd.DataFrame([\n",
        "    {\"area\": \"Governance\", \"question\": \"Are source permissions, contracts, classification, and policy enforcement defined?\"},\n",
        "    {\"area\": \"Replay and durability\", \"question\": \"Do you need strict replay semantics, long retention, or broad decoupling?\"},\n",
        "    {\"area\": \"Schema drift\", \"question\": \"Can downstream consumers tolerate contract changes?\"},\n",
        "    {\"area\": \"Transformation boundaries\", \"question\": \"Are you limiting Eventstream to lightweight shaping rather than deep orchestration?\"},\n",
        "    {\"area\": \"Burst handling\", \"question\": \"Can Eventstream absorb peak rates without a separate ingress layer?\"},\n",
        "    {\"area\": \"Team ownership\", \"question\": \"Will moving ingestion inward simplify or complicate operational ownership?\"},\n",
        "])\n",
        "\n",
        "tradeoffs"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Next Steps\n",
        "\n",
        "Fabric Eventstream’s connector model changes real-time ingestion strategy by moving the integration boundary inward. For many Fabric-bound streaming workloads, the right first question is no longer how to build an external ingress stack, but whether Eventstream can already handle source connectivity, lightweight shaping, and routing well enough.\n",
        "\n",
        "Use this notebook to validate three things in your environment: network reachability, successful Kafka-compatible publishing, and tolerance for bursty traffic. Then review the governance, replay, schema, and ownership tradeoffs before deciding whether Eventstream is enough or whether a broader integration layer is still justified."
      ]
    }
  ]
}