Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
kind: fix
date: 2025-12-16
---

* Fix an issue to ensure that hosts are consistently removed from Ops Manager monitoring during MongoDB and AppDB scale-down events.
Comment on lines +1 to +6
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

11 changes: 11 additions & 0 deletions controllers/om/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,17 @@ func (d Deployment) GetAllHostnames() []string {
return hostnames
}

// GetHostnamesForReplicaSet returns all hostnames for processes belonging to a specific replica set.
func (d Deployment) GetHostnamesForReplicaSet(rsName string) []string {
var hostnames []string
for _, p := range d.getProcesses() {
if p.replicaSetName() == rsName {
hostnames = append(hostnames, p.HostName())
}
}
return hostnames
}

func (d Deployment) NumberOfProcesses() int {
return len(d.getProcesses())
}
Expand Down
39 changes: 38 additions & 1 deletion controllers/om/host/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,45 @@ func stopMonitoringHosts(getRemover GetRemover, hosts []string, log *zap.Sugared
return nil
}

// CalculateDiffAndStopMonitoringHosts checks hosts that are present in hostsBefore but not hostsAfter, and removes
// CalculateDiffAndStopMonitoring checks hosts that are present in hostsBefore but not hostsAfter, and removes
// monitoring from them.
func CalculateDiffAndStopMonitoring(getRemover GetRemover, hostsBefore, hostsAfter []string, log *zap.SugaredLogger) error {
return stopMonitoringHosts(getRemover, util.FindLeftDifference(hostsBefore, hostsAfter), log)
}

// GetAllMonitoredHostnames returns all hostnames currently monitored in the OM project.
//
// Note: This relies on the constraint that one OM project = one deployment, which is enforced
// by the MongoDB Kubernetes operator. All monitored hosts in the project belong to this deployment.
//
// The OM API supports server-side filtering via the clusterId query parameter:
// GET /groups/{PROJECT-ID}/hosts?clusterId={CLUSTER-ID}
// See: https://www.mongodb.com/docs/ops-manager/current/reference/api/hosts/get-all-hosts-in-group/
// If we have access to the cluster ID reliably, we could use server-side filtering.
func GetAllMonitoredHostnames(getter Getter) ([]string, error) {
allHosts, err := getter.GetHosts()
if err != nil {
return nil, xerrors.Errorf("failed to get hosts from OM: %w", err)
}

hostnames := make([]string, len(allHosts.Results))
for i, h := range allHosts.Results {
hostnames[i] = h.Hostname
}
return hostnames, nil
}

// RemoveUndesiredMonitoringHosts ensures only the desired hosts are monitored.
// It compares all monitored hosts in the OM project against the desired list and removes any extras.
// This is idempotent and should be called on every reconciliation to clean up orphaned hosts.
//
// Note: This relies on the constraint that one OM project = one deployment.
// All monitored hosts in the project belong to this deployment, so no filtering is needed.
func RemoveUndesiredMonitoringHosts(getRemover GetRemover, hostsDesired []string, log *zap.SugaredLogger) error {
hostsMonitored, err := GetAllMonitoredHostnames(getRemover)
if err != nil {
return err
}

return CalculateDiffAndStopMonitoring(getRemover, hostsMonitored, hostsDesired, log)
}
35 changes: 26 additions & 9 deletions controllers/om/mockedomclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ type MockedOmConnection struct {
backupAgentConfig *BackupAgentConfig
monitoringAgentConfig *MonitoringAgentConfig
controlledFeature *controlledfeature.ControlledFeature
// hosts are used for both automation agents and monitoring endpoints.
// They are necessary for emulating "agents" are ready behavior as operator checks for hosts for agents to exist
// In Ops Manager, "hosts" and "automation agents" are two different things:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without fixing the mock (separation between agents types), some tests for sharded cluster were failing after the changes:

  • TestMultiClusterShardedScalingWithOverrides
  • TestMultiClusterShardedScaling
  • TestReconcileCreateShardedCluster
  • [...]

// - hostResults: the monitored hosts shown in the OM UI (via /hosts API)
// - agentHostnameMap: the automation agents that ping OM (via /agents/AUTOMATION API)
// When we remove a host from monitoring (e.g. during scale down), the automation
// agent on that host doesn't just disappear - it keeps running until the pod is deleted.
hostResults *host.Result
agentHostnameMap map[string]struct{}

Expand Down Expand Up @@ -168,6 +171,11 @@ func NewEmptyMockedOmConnection(ctx *OMContext) Connection {
func NewMockedOmConnection(d Deployment) *MockedOmConnection {
connection := MockedOmConnection{deployment: d}
connection.hostResults = buildHostsFromDeployment(d)
// Also populate agentHostnameMap so the mock knows which agents are "registered"
connection.agentHostnameMap = make(map[string]struct{})
for _, h := range connection.hostResults.Results {
connection.agentHostnameMap[h.Hostname] = struct{}{}
}
connection.BackupConfigs = make(map[string]*backup.Config)
connection.BackupHostClusters = make(map[string]*backup.HostCluster)
connection.SnapshotSchedules = make(map[string]*backup.SnapshotSchedule)
Expand Down Expand Up @@ -377,8 +385,16 @@ func (oc *MockedOmConnection) ReadUpdateAutomationConfig(modifyACFunc func(ac *A
return err
}

func (oc *MockedOmConnection) AddHost(host host.Host) error {
oc.hostResults.Results = append(oc.hostResults.Results, host)
func (oc *MockedOmConnection) AddHost(h host.Host) error {
// Generate an ID if not set (like the real OM API would do)
if h.Id == "" {
if oc.agentHostnameMap == nil {
oc.agentHostnameMap = map[string]struct{}{}
}
h.Id = strconv.Itoa(len(oc.hostResults.Results))
oc.agentHostnameMap[h.Hostname] = struct{}{}
}
oc.hostResults.Results = append(oc.hostResults.Results, h)
return nil
}

Expand Down Expand Up @@ -473,10 +489,12 @@ func (oc *MockedOmConnection) ReadAutomationAgents(pageNum int) (Paginated, erro
return oc.ReadAutomationAgentsFunc(pageNum)
}

// We use agentHostnameMap here, not hostResults. In real OM, the /agents/AUTOMATION
// endpoint returns agents based on their heartbeats, independent of the /hosts endpoint.
results := make([]AgentStatus, 0)
for _, r := range oc.hostResults.Results {
for hostname := range oc.agentHostnameMap {
results = append(results,
AgentStatus{Hostname: r.Hostname, LastConf: time.Now().Add(time.Second * -1).Format(time.RFC3339)})
AgentStatus{Hostname: hostname, LastConf: time.Now().Add(time.Second * -1).Format(time.RFC3339)})
}

return AutomationAgentStatusResponse{AutomationAgents: results}, nil
Expand All @@ -496,9 +514,8 @@ func (oc *MockedOmConnection) RemoveHost(hostID string) error {
}
}
oc.hostResults = &host.Result{Results: toKeep}
oc.agentHostnameMap = util.TransformToMap(oc.hostResults.Results, func(obj host.Host, idx int) (string, struct{}) {
return obj.Hostname, struct{}{}
})
// We don't touch agentHostnameMap here - in real OM, removing a host from monitoring
// doesn't unregister its automation agent. The agent keeps pinging until the pod dies.
return nil
}

Expand Down
16 changes: 16 additions & 0 deletions controllers/operator/appdbreplicaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,22 @@ func (r *ReconcileAppDbReplicaSet) registerAppDBHostsWithProject(hostnames []str
}
}
}

// Remove hosts that are no longer in the desired list (scale-down scenario)
desiredHostnames := make(map[string]struct{})
for _, h := range hostnames {
desiredHostnames[h] = struct{}{}
}

for _, existingHost := range getHostsResult.Results {
if _, wanted := desiredHostnames[existingHost.Hostname]; !wanted {
log.Debugf("Removing AppDB host %s from monitoring as it's no longer needed", existingHost.Hostname)
if err := conn.RemoveHost(existingHost.Id); err != nil {
return xerrors.Errorf("error removing appdb host %s: %w", existingHost.Hostname, err)
}
}
}

return nil
}

Expand Down
103 changes: 103 additions & 0 deletions controllers/operator/appdbreplicaset_controller_multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/api/v1/common"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/automationconfig"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/annotations"
kubernetesClient "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/client"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/configmap"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/secret"
"github.com/mongodb/mongodb-kubernetes/pkg/dns"
Expand Down Expand Up @@ -1780,3 +1782,104 @@ func TestAppDBMultiClusterServiceCreation_WithExternalName(t *testing.T) {
})
}
}

// TestAppDBMultiCluster_ScaleDown_HostsRemovedFromMonitoring verifies that hosts are removed from monitoring
// when scaling down
func TestAppDBMultiCluster_ScaleDown_HostsRemovedFromMonitoring(t *testing.T) {
ctx := context.Background()
log := zap.S()
centralClusterName := multicluster.LegacyCentralClusterName
memberClusterName1 := "member-cluster-1"
memberClusterName2 := "member-cluster-2"
clusters := []string{centralClusterName, memberClusterName1, memberClusterName2}

builder := DefaultOpsManagerBuilder().
SetName("om").
SetNamespace("ns").
SetAppDBClusterSpecList(mdbv1.ClusterSpecList{
{
ClusterName: memberClusterName1,
Members: 3,
},
{
ClusterName: memberClusterName2,
Members: 2,
},
}).
SetAppDbMembers(0).
SetAppDBTopology(mdbv1.ClusterTopologyMultiCluster)

opsManager := builder.Build()
// Use addOMHosts=false to prevent the interceptor from re-adding hosts when
// StatefulSets are fetched during scale-down. This allows testing host removal.
omConnectionFactory := om.NewDefaultCachedOMConnectionFactory()
fakeClient := mock.NewEmptyFakeClientBuilder().
WithObjects(opsManager).
WithObjects(mock.GetDefaultResources()...).
WithInterceptorFuncs(interceptor.Funcs{
Get: mock.GetFakeClientInterceptorGetFunc(omConnectionFactory, true, false),
}).Build()
kubeClient := kubernetesClient.NewClient(fakeClient)
globalClusterMap := getAppDBFakeMultiClusterMapWithClusters(clusters[1:], omConnectionFactory)

err := createOpsManagerUserPasswordSecret(ctx, kubeClient, opsManager, opsManagerUserPassword)
assert.NoError(t, err)

reconciler, err := newAppDbMultiReconciler(ctx, kubeClient, opsManager, globalClusterMap, log, omConnectionFactory.GetConnectionFunc)
require.NoError(t, err)

reconcileResult, err := reconciler.ReconcileAppDB(ctx, opsManager)
require.NoError(t, err)
assert.True(t, reconcileResult.Requeue)

createOMAPIKeySecret(ctx, t, reconciler.SecretClient, opsManager)

reconciler, err = newAppDbMultiReconciler(ctx, kubeClient, opsManager, globalClusterMap, log, omConnectionFactory.GetConnectionFunc)
require.NoError(t, err)
reconcileResult, err = reconciler.ReconcileAppDB(ctx, opsManager)
require.NoError(t, err)
require.False(t, reconcileResult.Requeue)

initialHostnames := []string{
"om-db-0-0-svc.ns.svc.cluster.local",
"om-db-0-1-svc.ns.svc.cluster.local",
"om-db-0-2-svc.ns.svc.cluster.local",
"om-db-1-0-svc.ns.svc.cluster.local",
"om-db-1-1-svc.ns.svc.cluster.local",
}

assertExpectedHostnamesAndPreferred(t, omConnectionFactory.GetConnection().(*om.MockedOmConnection), initialHostnames)

opsManager.Spec.AppDB.ClusterSpecList = mdbv1.ClusterSpecList{
{
ClusterName: memberClusterName1,
Members: 2,
},
{
ClusterName: memberClusterName2,
Members: 1,
},
}

for i := 0; i < 2; i++ {
reconciler, err = newAppDbMultiReconciler(ctx, kubeClient, opsManager, globalClusterMap, log, omConnectionFactory.GetConnectionFunc)
require.NoError(t, err)
_, err = reconciler.ReconcileAppDB(ctx, opsManager)
require.NoError(t, err)
}

expectedHostnamesAfterScaleDown := []string{
"om-db-0-0-svc.ns.svc.cluster.local",
"om-db-0-1-svc.ns.svc.cluster.local",
"om-db-1-0-svc.ns.svc.cluster.local",
}

// Only check hosts (not preferred hostnames) after scale-down because the API
// doesn't support removing preferred hostnames
// The important thing for monitoring is that hosts are removed.
omConnection := omConnectionFactory.GetConnection().(*om.MockedOmConnection)
hosts, _ := omConnection.GetHosts()
assert.Equal(t, expectedHostnamesAfterScaleDown, util.Transform(hosts.Results, func(obj host.Host) string {
return obj.Hostname
}), "the AppDB hosts should have been removed after scale-down")
}
13 changes: 13 additions & 0 deletions controllers/operator/appdbreplicaset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,19 @@ func TestRegisterAppDBHostsWithProject(t *testing.T) {
hosts, _ := omConnectionFactory.GetConnection().GetHosts()
assert.Len(t, hosts.Results, 5)
})

t.Run("Ensure hosts are removed when scaled down", func(t *testing.T) {
opsManager.Spec.AppDB.Members = 3
_, err = reconciler.ReconcileAppDB(ctx, opsManager)

hostnames := reconciler.getCurrentStatefulsetHostnames(opsManager)
err = reconciler.registerAppDBHostsWithProject(hostnames, omConnectionFactory.GetConnection(), "password", zap.S())
assert.NoError(t, err)

// After scale-down, hosts should be removed from monitoring
hosts, _ := omConnectionFactory.GetConnection().GetHosts()
assert.Len(t, hosts.Results, 3, "Expected 3 hosts after scaling down from 5 to 3 members")
})
}

func TestEnsureAppDbAgentApiKey(t *testing.T) {
Expand Down
46 changes: 35 additions & 11 deletions controllers/operator/mongodbmultireplicaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,15 +712,10 @@ func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Conte
return annotations.SetAnnotations(ctx, &mrs, annotationsToAdd, r.client)
}

// updateOmDeploymentRs performs OM registration operation for the replicaset. So the changes will be finally propagated
// to automation agents in containers
func (r *ReconcileMongoDbMultiReplicaSet) updateOmDeploymentRs(ctx context.Context, conn om.Connection, mrs mdbmultiv1.MongoDBMultiCluster, agentCertPath, tlsCertPath, internalClusterCertPath string, isRecovering bool, log *zap.SugaredLogger) error {
reachableHostnames := make([]string, 0)

clusterSpecList, err := mrs.GetClusterSpecItems()
if err != nil {
return err
}
// getAllHostnames returns the hostnames of all replicas across all clusters.
// Unhealthy clusters are ignored when reachableClustersOnly is set to true
func (r *ReconcileMongoDbMultiReplicaSet) getAllHostnames(mrs mdbmultiv1.MongoDBMultiCluster, clusterSpecList mdb.ClusterSpecList, reachableClustersOnly bool, log *zap.SugaredLogger) ([]string, error) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extracted this logic that was in "updateDeploymentRs" into a subfunction

var hostnames []string
failedClusterNames, err := mrs.GetFailedClusterNames()
if err != nil {
// When failing to retrieve the list of failed clusters we proceed assuming there are no failed clusters,
Expand All @@ -729,15 +724,33 @@ func (r *ReconcileMongoDbMultiReplicaSet) updateOmDeploymentRs(ctx context.Conte
}
for _, spec := range clusterSpecList {
hostnamesToAdd := dns.GetMultiClusterProcessHostnames(mrs.Name, mrs.Namespace, mrs.ClusterNum(spec.ClusterName), spec.Members, mrs.Spec.GetClusterDomain(), mrs.Spec.GetExternalDomainForMemberCluster(spec.ClusterName))
if stringutil.Contains(failedClusterNames, spec.ClusterName) {

if stringutil.Contains(failedClusterNames, spec.ClusterName) && reachableClustersOnly {
log.Debugf("Skipping hostnames %+v as they are part of the failed cluster %s ", hostnamesToAdd, spec.ClusterName)
continue
}
if mrs.GetClusterSpecByName(spec.ClusterName) == nil {
log.Debugf("Skipping hostnames %+v as they are part of a cluster not known by the operator %s ", hostnamesToAdd, spec.ClusterName)
continue
}
reachableHostnames = append(reachableHostnames, hostnamesToAdd...)
hostnames = append(hostnames, hostnamesToAdd...)
}

return hostnames, nil
}

// updateOmDeploymentRs performs OM registration operation for the replicaset. So the changes will be finally propagated
// to automation agents in containers
func (r *ReconcileMongoDbMultiReplicaSet) updateOmDeploymentRs(ctx context.Context, conn om.Connection, mrs mdbmultiv1.MongoDBMultiCluster, agentCertPath, tlsCertPath, internalClusterCertPath string, isRecovering bool, log *zap.SugaredLogger) error {
// This clusterSpecList reflects the desired state for this reconciliation, not the final one (the resource spec)
clusterSpecList, err := mrs.GetClusterSpecItems()
if err != nil {
return err
}

reachableHostnames, err := r.getAllHostnames(mrs, clusterSpecList, true, log)
if err != nil {
return err
}

err = agents.WaitForRsAgentsToRegisterSpecifiedHostnames(conn, reachableHostnames, log)
Expand Down Expand Up @@ -814,6 +827,17 @@ func (r *ReconcileMongoDbMultiReplicaSet) updateOmDeploymentRs(ctx context.Conte
if err := om.WaitForReadyState(conn, reachableProcessNames, isRecovering, log); err != nil && !isRecovering {
return err
}

// The hostnames we get here are the ones for the current reconciliation. Not the final state.
// Note that we include unhealthy clusters (we don't want to remove them from monitoring)
allHostNames, err := r.getAllHostnames(mrs, clusterSpecList, false, log)
if err != nil && !isRecovering {
return err
}
if err := host.RemoveUndesiredMonitoringHosts(conn, allHostNames, log); err != nil {
log.Warnf("failed to remove stale host(s) from Ops Manager monitoring: %s", err.Error())
}

return nil
}

Expand Down
Loading